diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 81d63db..70e1863 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.security.AccessControlException; import java.text.MessageFormat; @@ -147,8 +148,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -772,20 +772,18 @@ public KillApplicationResponse forceKillApplication( } if (application.isAppFinalStateStored()) { - RMAuditLogger.logSuccess(callerUGI.getShortUserName(), - AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId, - callerContext); return KillApplicationResponse.newInstance(true); } - String message = "Kill application " + applicationId + - " received from " + callerUGI; - if(null != Server.getRemoteAddress()) { - message += " at " + Server.getRemoteAddress(); + String message = "Kill application " + applicationId + " received from " + + callerUGI; + InetAddress remoteAddress = Server.getRemoteIp(); + if (null != remoteAddress) { + message += " at " + remoteAddress.getHostAddress(); } - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL, - message)); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMAppKillByClientEvent(applicationId, message, callerUGI, + remoteAddress)); // For UnmanagedAMs, return true so they don't retry return KillApplicationResponse.newInstance( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index c5bf000..84c0390 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -77,12 +77,12 @@ public static final String LIST_RESERVATION_REQUEST = "List " + "Reservation Request"; } - + static String createSuccessLog(String user, String operation, String target, ApplicationId appId, ApplicationAttemptId attemptId, ContainerId containerId, Resource resource) { return createSuccessLog(user, operation, target, appId, attemptId, - containerId, resource, null); + containerId, resource, null, Server.getRemoteIp()); } /** @@ -90,10 +90,13 @@ static String createSuccessLog(String user, String operation, String target, */ static String createSuccessLog(String user, String operation, String target, ApplicationId appId, ApplicationAttemptId attemptId, - ContainerId containerId, Resource resource, CallerContext callerContext) { + ContainerId containerId, Resource resource, CallerContext callerContext, + InetAddress ip) { StringBuilder b = new StringBuilder(); start(Keys.USER, user, b); - addRemoteIP(b); + if (ip != null) { + add(Keys.IP, ip.getHostAddress(), b); + } add(Keys.OPERATION, operation, b); add(Keys.TARGET, target ,b); add(Keys.RESULT, AuditConstants.SUCCESS, b); @@ -183,10 +186,37 @@ public static void logSuccess(String user, String operation, String target, ApplicationId appId, CallerContext callerContext) { if (LOG.isInfoEnabled()) { LOG.info(createSuccessLog(user, operation, target, appId, null, null, - null, callerContext)); + null, callerContext, Server.getRemoteIp())); } } + /** + * Create a readable and parseable audit log string for a successful event. + * + * @param user + * User who made the service request to the ResourceManager. + * @param operation + * Operation requested by the user. + * @param target + * The target on which the operation is being performed. + * @param appId + * Application Id in which operation was performed. + * @param ip + * The ip address of the caller. + * + *
+ *
+ * Note that the {@link RMAuditLogger} uses tabs ('\t') as a key-val + * delimiter and hence the value fields should not contains tabs + * ('\t'). + */ + public static void logSuccess(String user, String operation, String target, + ApplicationId appId, InetAddress ip) { + if (LOG.isInfoEnabled()) { + LOG.info(createSuccessLog(user, operation, target, appId, null, null, + null, null, ip)); + } + } /** * Create a readable and parseable audit log string for a successful event. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 45ff79c..a178dbc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp; import java.io.IOException; +import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; @@ -75,6 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; @@ -132,8 +135,7 @@ private final int maxAppAttempts; private final ReadLock readLock; private final WriteLock writeLock; - private final Map attempts - = new LinkedHashMap(); + private final Map attempts = new LinkedHashMap(); private final long submitTime; private final Set updatedNodes = new HashSet(); private final String applicationType; @@ -158,22 +160,18 @@ private volatile RMAppAttempt currentAttempt; private String queue; private EventHandler handler; - private static final AppFinishedTransition FINISHED_TRANSITION = - new AppFinishedTransition(); + private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition(); private Set ranNodes = new ConcurrentSkipListSet(); private final boolean logAggregationEnabled; private long logAggregationStartTime = 0; private final long logAggregationStatusTimeout; - private final Map logAggregationStatus = - new HashMap(); + private final Map logAggregationStatus = new HashMap(); private LogAggregationStatus logAggregationStatusForAppReport; private int logAggregationSucceed = 0; private int logAggregationFailed = 0; - private Map> logAggregationDiagnosticsForNMs = - new HashMap>(); - private Map> logAggregationFailureMessagesForNMs = - new HashMap>(); + private Map> logAggregationDiagnosticsForNMs = new HashMap>(); + private Map> logAggregationFailureMessagesForNMs = new HashMap>(); private final int maxLogAggregationDiagnosticsInMemory; // These states stored are only valid when app is at killing or final_saving. @@ -183,230 +181,232 @@ private RMAppState targetedFinalState; private RMAppState recoveredFinalState; private ResourceRequest amReq; - + private CallerContext callerContext; Object transitionTodo; - private static final StateMachineFactory stateMachineFactory - = new StateMachineFactory(RMAppState.NEW) - - - // Transitions from NEW state - .addTransition(RMAppState.NEW, RMAppState.NEW, - RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.NEW, RMAppState.NEW, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, - RMAppEventType.START, new RMAppNewlySavingTransition()) - .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, - RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, - RMAppState.KILLED, RMAppState.FINAL_SAVING), - RMAppEventType.RECOVER, new RMAppRecoveredTransition()) - .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, - new AppKilledTransition()) - .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, - RMAppEventType.APP_REJECTED, - new FinalSavingTransition(new AppRejectedTransition(), - RMAppState.FAILED)) - - // Transitions from NEW_SAVING state - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, - RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition()) - .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) - .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.APP_REJECTED, - new FinalSavingTransition(new AppRejectedTransition(), - RMAppState.FAILED)) - .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, - RMAppEventType.MOVE, new RMAppMoveTransition()) - - // Transitions from SUBMITTED state - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, - RMAppEventType.APP_REJECTED, - new FinalSavingTransition( - new AppRejectedTransition(), RMAppState.FAILED)) - .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition()) - .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) - - // Transitions from ACCEPTED state - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, - RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition( - YarnApplicationState.RUNNING)) - .addTransition(RMAppState.ACCEPTED, - EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), - // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED - // event because RMAppRecoveredTransition is returning ACCEPTED state - // directly and waiting for the previous AM to exit. - RMAppEventType.ATTEMPT_FAILED, - new AttemptFailedTransition(RMAppState.ACCEPTED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_FINISHED, - new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, - RMAppEventType.KILL, new KillAttemptTransition()) - .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_KILLED, - new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - - // Transitions from RUNNING state - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.MOVE, new RMAppMoveTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_UNREGISTERED, - new FinalSavingTransition( - new AttemptUnregisteredTransition(), - RMAppState.FINISHING, RMAppState.FINISHED)) - .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, - // UnManagedAM directly jumps to finished - RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.RUNNING, - EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), - RMAppEventType.ATTEMPT_FAILED, - new AttemptFailedTransition(RMAppState.ACCEPTED)) - .addTransition(RMAppState.RUNNING, RMAppState.KILLING, - RMAppEventType.KILL, new KillAttemptTransition()) - - // Transitions from FINAL_SAVING state - .addTransition(RMAppState.FINAL_SAVING, - EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED, - RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED, - new FinalStateSavedTransition()) - .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_FINISHED, - new AttemptFinishedAtFinalSavingTransition()) - .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - // ignorable transitions - .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, - EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, - RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) - - // Transitions from FINISHING state - .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, - RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - // ignorable transitions - .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, - EnumSet.of(RMAppEventType.NODE_UPDATE, - // ignore Kill/Move as we have already saved the final Finished state - // in state store. - RMAppEventType.KILL, RMAppEventType.MOVE)) - - // Transitions from KILLING state - .addTransition(RMAppState.KILLING, RMAppState.KILLING, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.KILLING, RMAppState.KILLING, - RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition()) - .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_KILLED, - new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) - .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, - RMAppEventType.ATTEMPT_UNREGISTERED, - new FinalSavingTransition( - new AttemptUnregisteredTransition(), - RMAppState.FINISHING, RMAppState.FINISHED)) - .addTransition(RMAppState.KILLING, RMAppState.FINISHED, - // UnManagedAM directly jumps to finished - RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) - .addTransition(RMAppState.KILLING, - EnumSet.of(RMAppState.FINAL_SAVING), - RMAppEventType.ATTEMPT_FAILED, - new AttemptFailedTransition(RMAppState.KILLING)) - - .addTransition(RMAppState.KILLING, RMAppState.KILLING, - EnumSet.of( - RMAppEventType.NODE_UPDATE, - RMAppEventType.ATTEMPT_REGISTERED, - RMAppEventType.APP_UPDATE_SAVED, - RMAppEventType.KILL, RMAppEventType.MOVE)) - - // Transitions from FINISHED state - // ignorable transitions - .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, - EnumSet.of( - RMAppEventType.NODE_UPDATE, - RMAppEventType.ATTEMPT_UNREGISTERED, - RMAppEventType.ATTEMPT_FINISHED, - RMAppEventType.KILL, RMAppEventType.MOVE)) - - // Transitions from FAILED state - // ignorable transitions - .addTransition(RMAppState.FAILED, RMAppState.FAILED, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition(RMAppState.FAILED, RMAppState.FAILED, - EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, - RMAppEventType.MOVE)) - - // Transitions from KILLED state - // ignorable transitions - .addTransition(RMAppState.KILLED, RMAppState.KILLED, - RMAppEventType.APP_RUNNING_ON_NODE, - new AppRunningOnNodeTransition()) - .addTransition( - RMAppState.KILLED, - RMAppState.KILLED, - EnumSet.of(RMAppEventType.APP_ACCEPTED, - RMAppEventType.APP_REJECTED, RMAppEventType.KILL, - RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, - RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE)) - - .installTopology(); - - private final StateMachine - stateMachine; + private static final StateMachineFactory stateMachineFactory = new StateMachineFactory( + RMAppState.NEW) + + // Transitions from NEW state + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, + RMAppEventType.START, new RMAppNewlySavingTransition()) + .addTransition(RMAppState.NEW, + EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED, + RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, + RMAppState.FINAL_SAVING), + RMAppEventType.RECOVER, new RMAppRecoveredTransition()) + .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, + new AppKilledTransition()) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + + // Transitions from NEW_SAVING state + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, + RMAppEventType.APP_NEW_SAVED, + new AddApplicationToSchedulerTransition()) + .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition(new AppKilledTransition(), + RMAppState.KILLED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING, + RMAppEventType.MOVE, new RMAppMoveTransition()) + + // Transitions from SUBMITTED state + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, + RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition()) + .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING, + RMAppEventType.KILL, + new FinalSavingTransition(new AppKilledTransition(), + RMAppState.KILLED)) + + // Transitions from ACCEPTED state + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING, + RMAppEventType.ATTEMPT_REGISTERED, + new RMAppStateUpdateTransition(YarnApplicationState.RUNNING)) + .addTransition(RMAppState.ACCEPTED, + EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), + // ACCEPTED state is possible to receive + // ATTEMPT_FAILED/ATTEMPT_FINISHED + // event because RMAppRecoveredTransition is returning ACCEPTED + // state + // directly and waiting for the previous AM to exit. + RMAppEventType.ATTEMPT_FAILED, + new AttemptFailedTransition(RMAppState.ACCEPTED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_FINISHED, + new FinalSavingTransition(FINISHED_TRANSITION, + RMAppState.FINISHED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, + RMAppEventType.KILL, new KillAttemptTransition()) + .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_KILLED, + new FinalSavingTransition(new AppKilledTransition(), + RMAppState.KILLED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + + // Transitions from RUNNING state + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.MOVE, new RMAppMoveTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition(new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) + .addTransition(RMAppState.RUNNING, RMAppState.FINISHED, + // UnManagedAM directly jumps to finished + RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.RUNNING, + EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), + RMAppEventType.ATTEMPT_FAILED, + new AttemptFailedTransition(RMAppState.ACCEPTED)) + .addTransition(RMAppState.RUNNING, RMAppState.KILLING, + RMAppEventType.KILL, new KillAttemptTransition()) + + // Transitions from FINAL_SAVING state + .addTransition(RMAppState.FINAL_SAVING, + EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINISHED), + RMAppEventType.APP_UPDATE_SAVED, new FinalStateSavedTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_FINISHED, + new AttemptFinishedAtFinalSavingTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + // ignorable transitions + .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, + EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, + RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE)) + + // Transitions from FINISHING state + .addTransition(RMAppState.FINISHING, RMAppState.FINISHED, + RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + // ignorable transitions + .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, + EnumSet.of(RMAppEventType.NODE_UPDATE, + // ignore Kill/Move as we have already saved the final + // Finished state + // in state store. + RMAppEventType.KILL, RMAppEventType.MOVE)) + + // Transitions from KILLING state + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + RMAppEventType.COLLECTOR_UPDATE, + new RMAppCollectorUpdateTransition()) + .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_KILLED, + new FinalSavingTransition(new AppKilledTransition(), + RMAppState.KILLED)) + .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, + RMAppEventType.ATTEMPT_UNREGISTERED, + new FinalSavingTransition(new AttemptUnregisteredTransition(), + RMAppState.FINISHING, RMAppState.FINISHED)) + .addTransition(RMAppState.KILLING, RMAppState.FINISHED, + // UnManagedAM directly jumps to finished + RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) + .addTransition(RMAppState.KILLING, + EnumSet.of(RMAppState.FINAL_SAVING), + RMAppEventType.ATTEMPT_FAILED, + new AttemptFailedTransition(RMAppState.KILLING)) + + .addTransition(RMAppState.KILLING, RMAppState.KILLING, + EnumSet.of(RMAppEventType.NODE_UPDATE, + RMAppEventType.ATTEMPT_REGISTERED, + RMAppEventType.APP_UPDATE_SAVED, RMAppEventType.KILL, + RMAppEventType.MOVE)) + + // Transitions from FINISHED state + // ignorable transitions + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, + EnumSet.of(RMAppEventType.NODE_UPDATE, + RMAppEventType.ATTEMPT_UNREGISTERED, + RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.KILL, + RMAppEventType.MOVE)) + + // Transitions from FAILED state + // ignorable transitions + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.FAILED, RMAppState.FAILED, + EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE, + RMAppEventType.MOVE)) + + // Transitions from KILLED state + // ignorable transitions + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + RMAppEventType.APP_RUNNING_ON_NODE, + new AppRunningOnNodeTransition()) + .addTransition(RMAppState.KILLED, RMAppState.KILLED, + EnumSet.of(RMAppEventType.APP_ACCEPTED, + RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.ATTEMPT_FINISHED, + RMAppEventType.ATTEMPT_FAILED, RMAppEventType.NODE_UPDATE, + RMAppEventType.MOVE)) + + .installTopology(); + + private final StateMachine stateMachine; private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1; private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f; @@ -419,8 +419,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, String applicationType, Set applicationTags, ResourceRequest amReq) { this(applicationId, rmContext, config, name, user, queue, submissionContext, - scheduler, masterService, submitTime, applicationType, applicationTags, - amReq, -1); + scheduler, masterService, submitTime, applicationType, applicationTags, + amReq, -1); } public RMAppImpl(ApplicationId applicationId, RMContext rmContext, @@ -456,8 +456,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); int individualMaxAppAttempts = submissionContext.getMaxAppAttempts(); - if (individualMaxAppAttempts <= 0 || - individualMaxAppAttempts > globalMaxAppAttempts) { + if (individualMaxAppAttempts <= 0 + || individualMaxAppAttempts > globalMaxAppAttempts) { this.maxAppAttempts = globalMaxAppAttempts; LOG.warn("The specific max attempts: " + individualMaxAppAttempts + " for application: " + applicationId.getId() @@ -467,8 +467,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.maxAppAttempts = individualMaxAppAttempts; } - this.attemptFailuresValidityInterval = - submissionContext.getAttemptFailuresValidityInterval(); + this.attemptFailuresValidityInterval = submissionContext + .getAttemptFailuresValidityInterval(); if (this.attemptFailuresValidityInterval > 0) { LOG.info("The attemptFailuresValidityInterval for the application: " + this.applicationId + " is " + this.attemptFailuresValidityInterval @@ -483,18 +483,17 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, this.callerContext = CallerContext.getCurrent(); - long localLogAggregationStatusTimeout = - conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); + long localLogAggregationStatusTimeout = conf.getLong( + YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS); if (localLogAggregationStatusTimeout <= 0) { - this.logAggregationStatusTimeout = - YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; + this.logAggregationStatusTimeout = YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS; } else { this.logAggregationStatusTimeout = localLogAggregationStatusTimeout; } - this.logAggregationEnabled = - conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, - YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); + this.logAggregationEnabled = conf.getBoolean( + YarnConfiguration.LOG_AGGREGATION_ENABLED, + YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); if (this.logAggregationEnabled) { this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START; } else { @@ -506,22 +505,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, // amBlacklistingEnabled can be configured globally // Just use the global values - amBlacklistingEnabled = - conf.getBoolean( - YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, - YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED); + amBlacklistingEnabled = conf.getBoolean( + YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED, + YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED); if (amBlacklistingEnabled) { blacklistDisableThreshold = conf.getFloat( YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD, - YarnConfiguration. - DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD); + YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD); // Verify whether blacklistDisableThreshold is valid. And for invalid // threshold, reset to global level blacklistDisableThreshold // configured. - if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE || - blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) { - blacklistDisableThreshold = YarnConfiguration. - DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; + if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE + || blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) { + blacklistDisableThreshold = YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD; } } } @@ -531,10 +527,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext, * be used only if the timeline service v.2 is enabled. */ public void startTimelineCollector() { - AppLevelTimelineCollector collector = - new AppLevelTimelineCollector(applicationId); - rmContext.getRMTimelineCollectorManager().putIfAbsent( - applicationId, collector); + AppLevelTimelineCollector collector = new AppLevelTimelineCollector( + applicationId); + rmContext.getRMTimelineCollectorManager().putIfAbsent(applicationId, + collector); } /** @@ -549,7 +545,7 @@ public void stopTimelineCollector() { public ApplicationId getApplicationId() { return this.applicationId; } - + @Override public ApplicationSubmissionContext getApplicationSubmissionContext() { return this.submissionContext; @@ -572,7 +568,7 @@ public FinalApplicationStatus getFinalApplicationStatus() { public RMAppState getState() { this.readLock.lock(); try { - return this.stateMachine.getCurrentState(); + return this.stateMachine.getCurrentState(); } finally { this.readLock.unlock(); } @@ -607,7 +603,7 @@ public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) { public String getQueue() { return this.queue; } - + @Override public void setQueue(String queue) { this.queue = queue; @@ -649,8 +645,9 @@ public RMAppAttempt getCurrentAppAttempt() { } } - private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { - switch(state) { + private FinalApplicationStatus createFinalApplicationStatus( + RMAppState state) { + switch (state) { case NEW: case NEW_SAVING: case SUBMITTED: @@ -658,8 +655,8 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) { case RUNNING: case FINAL_SAVING: case KILLING: - return FinalApplicationStatus.UNDEFINED; - // finished without a proper final state is the same as failed + return FinalApplicationStatus.UNDEFINED; + // finished without a proper final state is the same as failed case FINISHING: case FINISHED: case FAILED: @@ -682,7 +679,7 @@ public int pullRMNodeUpdates(Collection updatedNodes) { this.writeLock.unlock(); } } - + @Override public ApplicationReport createAndGetApplicationReport(String clientUserName, boolean allowAccess) { @@ -696,8 +693,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, String origTrackingUrl = UNAVAILABLE; LogAggregationStatus logAggregationStatus = null; int rpcPort = -1; - ApplicationResourceUsageReport appUsageReport = - RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; + ApplicationResourceUsageReport appUsageReport = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT; FinalApplicationStatus finishState = getFinalApplicationStatus(); String diags = UNAVAILABLE; float progress = 0.0f; @@ -711,8 +707,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, if (UserGroupInformation.isSecurityEnabled()) { // get a token so the client can communicate with the app attempt // NOTE: token may be unavailable if the attempt is not running - Token attemptClientToAMToken = - this.currentAttempt.createClientToken(clientUserName); + Token attemptClientToAMToken = this.currentAttempt + .createClientToken(clientUserName); if (attemptClientToAMToken != null) { clientToAMToken = BuilderUtils.newClientToAMToken( attemptClientToAMToken.getIdentifier(), @@ -727,13 +723,13 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, progress = currentAttempt.getProgress(); logAggregationStatus = this.getLogAggregationStatusForAppReport(); } - //if the diagnostics is not already set get it from attempt + // if the diagnostics is not already set get it from attempt diags = getDiagnostics().toString(); - if (currentAttempt != null && - currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) { - if (getApplicationSubmissionContext().getUnmanagedAM() && - clientUserName != null && getUser().equals(clientUserName)) { + if (currentAttempt != null && currentAttempt + .getAppAttemptState() == RMAppAttemptState.LAUNCHED) { + if (getApplicationSubmissionContext().getUnmanagedAM() + && clientUserName != null && getUser().equals(clientUserName)) { Token token = currentAttempt.getAMRMToken(); if (token != null) { amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(), @@ -749,9 +745,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName, } if (currentApplicationAttemptId == null) { - currentApplicationAttemptId = - BuilderUtils.newApplicationAttemptId(this.applicationId, - DUMMY_APPLICATION_ATTEMPT_NUMBER); + currentApplicationAttemptId = BuilderUtils.newApplicationAttemptId( + this.applicationId, DUMMY_APPLICATION_ATTEMPT_NUMBER); } ApplicationReport report = BuilderUtils.newApplicationReport( @@ -779,8 +774,8 @@ private String getDefaultProxyTrackingUrl() { URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId); return result.toASCIIString(); } catch (URISyntaxException e) { - LOG.warn("Could not generate default proxy tracking URL for " - + applicationId); + LOG.warn( + "Could not generate default proxy tracking URL for " + applicationId); return UNAVAILABLE; } } @@ -858,8 +853,8 @@ public void handle(RMAppEvent event) { try { ApplicationId appID = event.getApplicationId(); - LOG.debug("Processing event for " + appID + " of type " - + event.getType()); + LOG.debug( + "Processing event for " + appID + " of type " + event.getType()); final RMAppState oldState = getState(); try { /* keep the master in sync with the state machine */ @@ -870,8 +865,8 @@ public void handle(RMAppEvent event) { } if (oldState != getState()) { - LOG.info(appID + " State change from " + oldState + " to " - + getState()); + LOG.info(appID + " State change from " + oldState + " to " + getState() + + " on event=" + event.getType()); } } finally { this.writeLock.unlock(); @@ -880,14 +875,14 @@ public void handle(RMAppEvent event) { @Override public void recover(RMState state) { - ApplicationStateData appState = - state.getApplicationState().get(getApplicationId()); + ApplicationStateData appState = state.getApplicationState() + .get(getApplicationId()); this.recoveredFinalState = appState.getState(); - LOG.info("Recovering app: " + getApplicationId() + " with " + - + appState.getAttemptCount() + " attempts and final state = " - + this.recoveredFinalState ); - this.diagnostics.append(null == appState.getDiagnostics() ? "" : appState - .getDiagnostics()); + LOG.info("Recovering app: " + getApplicationId() + " with " + + +appState.getAttemptCount() + " attempts and final state = " + + this.recoveredFinalState); + this.diagnostics.append( + null == appState.getDiagnostics() ? "" : appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); this.callerContext = appState.getCallerContext(); @@ -896,24 +891,24 @@ public void recover(RMState state) { this.firstAttemptIdInStateStore = appState.getFirstAttemptId(); this.nextAttemptId = firstAttemptIdInStateStore; } - //TODO recover collector address. - //this.collectorAddr = appState.getCollectorAddr(); + // TODO recover collector address. + // this.collectorAddr = appState.getCollectorAddr(); // send the ATS create Event sendATSCreateEvent(this, this.startTime); RMAppAttemptImpl preAttempt = null; - for (ApplicationAttemptId attemptId : - new TreeSet<>(appState.attempts.keySet())) { + for (ApplicationAttemptId attemptId : new TreeSet<>( + appState.attempts.keySet())) { // create attempt createNewAttempt(attemptId); - ((RMAppAttemptImpl)this.currentAttempt).recover(state); + ((RMAppAttemptImpl) this.currentAttempt).recover(state); // If previous attempt is not in final state, it means we failed to store // its final state. We set it to FAILED now because we could not make sure // about its final state. if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) { preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED); } - preAttempt = (RMAppAttemptImpl)currentAttempt; + preAttempt = (RMAppAttemptImpl) currentAttempt; } if (currentAttempt != null) { nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1; @@ -921,8 +916,8 @@ public void recover(RMState state) { } private void createNewAttempt() { - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(applicationId, nextAttemptId++); + ApplicationAttemptId appAttemptId = ApplicationAttemptId + .newInstance(applicationId, nextAttemptId++); createNewAttempt(appAttemptId); } @@ -939,24 +934,23 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) { currentAMBlacklistManager = new DisabledBlacklistManager(); } } - RMAppAttempt attempt = - new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, - submissionContext, conf, - // The newly created attempt maybe last attempt if (number of - // previously failed attempts(which should not include Preempted, - // hardware error and NM resync) + 1) equal to the max-attempt - // limit. - maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, - currentAMBlacklistManager); + RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, + scheduler, masterService, submissionContext, conf, + // The newly created attempt maybe last attempt if (number of + // previously failed attempts(which should not include Preempted, + // hardware error and NM resync) + 1) equal to the max-attempt + // limit. + maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq, + currentAMBlacklistManager); attempts.put(appAttemptId, attempt); currentAttempt = attempt; } - - private void - createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { + + private void createAndStartNewAttempt( + boolean transferStateFromPreviousAttempt) { createNewAttempt(); handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), - transferStateFromPreviousAttempt)); + transferStateFromPreviousAttempt)); } private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { @@ -966,8 +960,8 @@ private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) { + " with state:" + nodeState); } - private static class RMAppTransition implements - SingleArcTransition { + private static class RMAppTransition + implements SingleArcTransition { public void transition(RMAppImpl app, RMAppEvent event) { }; } @@ -979,8 +973,7 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) { LOG.info("Updating collector info for app: " + app.getApplicationId()); - RMAppCollectorUpdateEvent appCollectorUpdateEvent = - (RMAppCollectorUpdateEvent) event; + RMAppCollectorUpdateEvent appCollectorUpdateEvent = (RMAppCollectorUpdateEvent) event; // Update collector address app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr()); @@ -1007,40 +1000,40 @@ public RMAppStateUpdateTransition(YarnApplicationState state) { } public void transition(RMAppImpl app, RMAppEvent event) { - app.rmContext.getSystemMetricsPublisher().appStateUpdated( - app, stateToATS, app.systemClock.getTime()); + app.rmContext.getSystemMetricsPublisher().appStateUpdated(app, stateToATS, + app.systemClock.getTime()); }; } - private static final class AppRunningOnNodeTransition extends RMAppTransition { + private static final class AppRunningOnNodeTransition + extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event; - + // if final state already stored, notify RMNode if (isAppInFinalState(app)) { - app.handler.handle( - new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent - .getApplicationId())); + app.handler.handle(new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), + nodeAddedEvent.getApplicationId())); return; } - + // otherwise, add it to ranNodes for further process app.ranNodes.add(nodeAddedEvent.getNodeId()); if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) { app.logAggregationStatus.put(nodeAddedEvent.getNodeId(), - LogAggregationReport.newInstance(app.applicationId, - app.logAggregationEnabled ? LogAggregationStatus.NOT_START - : LogAggregationStatus.DISABLED, "")); + LogAggregationReport.newInstance(app.applicationId, + app.logAggregationEnabled ? LogAggregationStatus.NOT_START + : LogAggregationStatus.DISABLED, + "")); } }; } /** - * Move an app to a new queue. - * This transition must set the result on the Future in the RMAppMoveEvent, - * either as an exception for failure or null for success, or the client will - * be left waiting forever. + * Move an app to a new queue. This transition must set the result on the + * Future in the RMAppMoveEvent, either as an exception for failure or null + * for success, or the client will be left waiting forever. */ private static final class RMAppMoveTransition extends RMAppTransition { public void transition(RMAppImpl app, RMAppEvent event) { @@ -1067,12 +1060,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { private void recoverAppAttempts() { for (RMAppAttempt attempt : getAppAttempts().values()) { attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); + RMAppAttemptEventType.RECOVER)); } } - private static final class RMAppRecoveredTransition implements - MultipleArcTransition { + private static final class RMAppRecoveredTransition + implements MultipleArcTransition { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { @@ -1105,15 +1098,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); return RMAppState.SUBMITTED; } // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, true)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, true)); // recover attempts app.recoverAppAttempts(); @@ -1125,12 +1118,12 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { } } - private static final class AddApplicationToSchedulerTransition extends - RMAppTransition { + private static final class AddApplicationToSchedulerTransition + extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false)); + app.handler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false)); } } @@ -1141,25 +1134,25 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private static final class FinalStateSavedTransition implements - MultipleArcTransition { + private static final class FinalStateSavedTransition + implements MultipleArcTransition { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { if (app.transitionTodo instanceof SingleArcTransition) { ((SingleArcTransition) app.transitionTodo).transition(app, - app.eventCausingFinalSaving); + app.eventCausingFinalSaving); } else if (app.transitionTodo instanceof MultipleArcTransition) { ((MultipleArcTransition) app.transitionTodo).transition(app, - app.eventCausingFinalSaving); + app.eventCausingFinalSaving); } return app.targetedFinalState; } } - private static class AttemptFailedFinalStateSavedTransition extends - RMAppTransition { + private static class AttemptFailedFinalStateSavedTransition + extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { String msg = null; @@ -1179,27 +1172,27 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) { if (this.submissionContext.getUnmanagedAM()) { // RM does not manage the AM. Do not retry msg = "Unmanaged application " + this.getApplicationId() - + " failed due to " + failedEvent.getDiagnosticMsg() - + ". Failing the application."; + + " failed due to " + failedEvent.getDiagnosticMsg() + + ". Failing the application."; } else if (this.isNumAttemptsBeyondThreshold) { int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); msg = String.format( - "Application %s failed %d times%s%s due to %s. Failing the application.", - getApplicationId(), - maxAppAttempts, + "Application %s failed %d times%s%s due to %s. Failing the application.", + getApplicationId(), maxAppAttempts, (attemptFailuresValidityInterval <= 0 ? "" - : (" in previous " + attemptFailuresValidityInterval + : (" in previous " + attemptFailuresValidityInterval + " milliseconds")), (globalLimit == maxAppAttempts) ? "" - : (" (global limit =" + globalLimit - + "; local limit is =" + maxAppAttempts + ")"), + : (" (global limit =" + globalLimit + "; local limit is =" + + maxAppAttempts + ")"), failedEvent.getDiagnosticMsg()); } return msg; } - private static final class RMAppNewlySavingTransition extends RMAppTransition { + private static final class RMAppNewlySavingTransition + extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { @@ -1250,10 +1243,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event, break; } - ApplicationStateData appState = - ApplicationStateData.newInstance(this.submitTime, this.startTime, - this.user, this.submissionContext, - stateToBeStored, diags, this.storedFinishTime, this.callerContext); + ApplicationStateData appState = ApplicationStateData.newInstance( + this.submitTime, this.startTime, this.user, this.submissionContext, + stateToBeStored, diags, this.storedFinishTime, this.callerContext); this.rmContext.getStateStore().updateApplicationState(appState); } @@ -1277,7 +1269,7 @@ public FinalSavingTransition(Object transitionToDo, @Override public void transition(RMAppImpl app, RMAppEvent event) { app.rememberTargetTransitionsAndStoreState(event, transitionToDo, - targetedFinalState, stateToBeStored); + targetedFinalState, stateToBeStored); } } @@ -1299,8 +1291,8 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } - private static class AttemptFinishedAtFinalSavingTransition extends - RMAppTransition { + private static class AttemptFinishedAtFinalSavingTransition + extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { if (app.targetedFinalState.equals(RMAppState.FAILED) @@ -1313,18 +1305,20 @@ public void transition(RMAppImpl app, RMAppEvent event) { // pass in the earlier attempt_unregistered event, as it is needed in // AppFinishedFinalStateSavedTransition later on app.rememberTargetTransitions(event, - new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving), - RMAppState.FINISHED); + new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving), + RMAppState.FINISHED); }; } - private static class AppFinishedFinalStateSavedTransition extends - RMAppTransition { + private static class AppFinishedFinalStateSavedTransition + extends RMAppTransition { RMAppEvent attemptUnregistered; - public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) { + public AppFinishedFinalStateSavedTransition( + RMAppEvent attemptUnregistered) { this.attemptUnregistered = attemptUnregistered; } + @Override public void transition(RMAppImpl app, RMAppEvent event) { new AttemptUnregisteredTransition().transition(app, attemptUnregistered); @@ -1332,6 +1326,24 @@ public void transition(RMAppImpl app, RMAppEvent event) { }; } + /** + * Log the audit event for kill by client. + * + * @param event The {@link RMAppEvent} to be logged + */ + static void auditLogKillEvent(RMAppEvent event) { + if (event instanceof RMAppKillByClientEvent) { + RMAppKillByClientEvent killEvent = (RMAppKillByClientEvent) event; + UserGroupInformation callerUGI = killEvent.getCallerUGI(); + String userName = null; + if (callerUGI != null) { + userName = callerUGI.getShortUserName(); + } + InetAddress remoteIP = killEvent.getIp(); + RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST, + "RMAppImpl", event.getApplicationId(), remoteIP); + } + } private static class AppKilledTransition extends FinalTransition { public AppKilledTransition() { @@ -1342,6 +1354,7 @@ public AppKilledTransition() { public void transition(RMAppImpl app, RMAppEvent event) { app.diagnostics.append(event.getDiagnosticMsg()); super.transition(app, event); + RMAppImpl.auditLogKillEvent(event); }; } @@ -1352,14 +1365,14 @@ public void transition(RMAppImpl app, RMAppEvent event) { // Forward app kill diagnostics in the event to kill app attempt. // These diagnostics will be returned back in ATTEMPT_KILLED event sent by // RMAppAttemptImpl. - app.handler.handle( - new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), + app.handler + .handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(), RMAppAttemptEventType.KILL, event.getDiagnosticMsg())); + RMAppImpl.auditLogKillEvent(event); } } - private static final class AppRejectedTransition extends - FinalTransition{ + private static final class AppRejectedTransition extends FinalTransition { public AppRejectedTransition() { super(RMAppState.FAILED); } @@ -1381,27 +1394,25 @@ public FinalTransition(RMAppState finalState) { public void transition(RMAppImpl app, RMAppEvent event) { app.logAggregationStartTime = System.currentTimeMillis(); for (NodeId nodeId : app.getRanNodes()) { - app.handler.handle( - new RMNodeCleanAppEvent(nodeId, app.applicationId)); + app.handler.handle(new RMNodeCleanAppEvent(nodeId, app.applicationId)); } app.finishTime = app.storedFinishTime; - if (app.finishTime == 0 ) { + if (app.finishTime == 0) { app.finishTime = app.systemClock.getTime(); } // Recovered apps that are completed were not added to scheduler, so no // need to remove them from scheduler. if (app.recoveredFinalState == null) { - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + app.handler.handle( + new AppRemovedSchedulerEvent(app.applicationId, finalState)); } - app.handler.handle( - new RMAppManagerEvent(app.applicationId, + app.handler.handle(new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); - app.rmContext.getRMApplicationHistoryWriter() - .applicationFinished(app, finalState); - app.rmContext.getSystemMetricsPublisher() - .appFinished(app, finalState, app.finishTime); + app.rmContext.getRMApplicationHistoryWriter().applicationFinished(app, + finalState); + app.rmContext.getSystemMetricsPublisher().appFinished(app, finalState, + app.finishTime); }; } @@ -1422,8 +1433,8 @@ private int getNumFailedAppAttempts() { return completedAttempts; } - private static final class AttemptFailedTransition implements - MultipleArcTransition { + private static final class AttemptFailedTransition + implements MultipleArcTransition { private final RMAppState initialState; @@ -1450,14 +1461,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // If this is not last attempt, app should be killed instead of // launching a new attempt app.rememberTargetTransitionsAndStoreState(event, - new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED); + new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED); return RMAppState.FINAL_SAVING; } boolean transferStateFromPreviousAttempt; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; - transferStateFromPreviousAttempt = - failedEvent.getTransferStateFromPreviousAttempt(); + transferStateFromPreviousAttempt = failedEvent + .getTransferStateFromPreviousAttempt(); RMAppAttempt oldAttempt = app.currentAttempt; app.createAndStartNewAttempt(transferStateFromPreviousAttempt); @@ -1468,15 +1479,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { // finished containers so that they can be acked to NM, // but when pulling finished container we will check this flag again. ((RMAppAttemptImpl) app.currentAttempt) - .transferStateFromAttempt(oldAttempt); + .transferStateFromAttempt(oldAttempt); return initialState; } else { if (numberOfFailure >= app.maxAppAttempts) { app.isNumAttemptsBeyondThreshold = true; } app.rememberTargetTransitionsAndStoreState(event, - new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, - RMAppState.FAILED); + new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED, + RMAppState.FAILED); return RMAppState.FINAL_SAVING; } } @@ -1532,7 +1543,7 @@ public YarnApplicationState createApplicationState() { } return RMServerUtils.createApplicationState(rmAppState); } - + public static boolean isAppInFinalState(RMApp rmApp) { RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState(); if (appState == null) { @@ -1541,7 +1552,7 @@ public static boolean isAppInFinalState(RMApp rmApp) { return appState == RMAppState.FAILED || appState == RMAppState.FINISHED || appState == RMAppState.KILLED; } - + public RMAppState getRecoveredFinalState() { return this.recoveredFinalState; } @@ -1550,7 +1561,7 @@ public RMAppState getRecoveredFinalState() { public Set getRanNodes() { return ranNodes; } - + @Override public RMAppMetrics getRMAppMetrics() { Resource resourcePreempted = Resource.newInstance(0, 0); @@ -1560,25 +1571,23 @@ public RMAppMetrics getRMAppMetrics() { long vcoreSeconds = 0; for (RMAppAttempt attempt : attempts.values()) { if (null != attempt) { - RMAppAttemptMetrics attemptMetrics = - attempt.getRMAppAttemptMetrics(); + RMAppAttemptMetrics attemptMetrics = attempt.getRMAppAttemptMetrics(); Resources.addTo(resourcePreempted, attemptMetrics.getResourcePreempted()); numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0; - numNonAMContainerPreempted += - attemptMetrics.getNumNonAMContainersPreempted(); + numNonAMContainerPreempted += attemptMetrics + .getNumNonAMContainersPreempted(); // getAggregateAppResourceUsage() will calculate resource usage stats // for both running and finished containers. - AggregateAppResourceUsage resUsage = - attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage(); + AggregateAppResourceUsage resUsage = attempt.getRMAppAttemptMetrics() + .getAggregateAppResourceUsage(); memorySeconds += resUsage.getMemorySeconds(); vcoreSeconds += resUsage.getVcoreSeconds(); } } - return new RMAppMetrics(resourcePreempted, - numNonAMContainerPreempted, numAMContainerPreempted, - memorySeconds, vcoreSeconds); + return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted, + numAMContainerPreempted, memorySeconds, vcoreSeconds); } @Private @@ -1591,10 +1600,10 @@ public void setSystemClock(Clock clock) { public ReservationId getReservationId() { return submissionContext.getReservationID(); } - + @Override public ResourceRequest getAMResourceRequest() { - return this.amReq; + return this.amReq; } protected Credentials parseCredentials() throws IOException { @@ -1613,22 +1622,21 @@ protected Credentials parseCredentials() throws IOException { public Map getLogAggregationReportsForApp() { try { this.readLock.lock(); - Map outputs = - new HashMap(); + Map outputs = new HashMap(); outputs.putAll(logAggregationStatus); if (!isLogAggregationFinished()) { for (Entry output : outputs.entrySet()) { if (!output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.TIME_OUT) + .equals(LogAggregationStatus.TIME_OUT) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED) + .equals(LogAggregationStatus.SUCCEEDED) && !output.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.FAILED) + .equals(LogAggregationStatus.FAILED) && isAppInFinalState(this) && System.currentTimeMillis() > this.logAggregationStartTime + this.logAggregationStatusTimeout) { - output.getValue().setLogAggregationStatus( - LogAggregationStatus.TIME_OUT); + output.getValue() + .setLogAggregationStatus(LogAggregationStatus.TIME_OUT); } } } @@ -1656,25 +1664,25 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { } } if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING - || curReport.getLogAggregationStatus() != - LogAggregationStatus.RUNNING_WITH_FAILURE) { - if (curReport.getLogAggregationStatus() - == LogAggregationStatus.TIME_OUT - && report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING) { - // If the log aggregation status got from latest nm heartbeat - // is Running, and current log aggregation status is TimeOut, - // based on whether there are any failure messages for this NM, - // we will reset the log aggregation status as RUNNING or - // RUNNING_WITH_FAILURE - if (logAggregationFailureMessagesForNMs.get(nodeId) != null && - !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) { + || curReport + .getLogAggregationStatus() != LogAggregationStatus.RUNNING_WITH_FAILURE) { + if (curReport + .getLogAggregationStatus() == LogAggregationStatus.TIME_OUT + && report + .getLogAggregationStatus() == LogAggregationStatus.RUNNING) { + // If the log aggregation status got from latest nm heartbeat + // is Running, and current log aggregation status is TimeOut, + // based on whether there are any failure messages for this NM, + // we will reset the log aggregation status as RUNNING or + // RUNNING_WITH_FAILURE + if (logAggregationFailureMessagesForNMs.get(nodeId) != null + && !logAggregationFailureMessagesForNMs.get(nodeId) + .isEmpty()) { report.setLogAggregationStatus( LogAggregationStatus.RUNNING_WITH_FAILURE); } } - curReport.setLogAggregationStatus(report - .getLogAggregationStatus()); + curReport.setLogAggregationStatus(report.getLogAggregationStatus()); } } updateLogAggregationDiagnosticMessages(nodeId, report); @@ -1691,14 +1699,13 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { public LogAggregationStatus getLogAggregationStatusForAppReport() { try { this.readLock.lock(); - if (! logAggregationEnabled) { + if (!logAggregationEnabled) { return LogAggregationStatus.DISABLED; } if (isLogAggregationFinished()) { return this.logAggregationStatusForAppReport; } - Map reports = - getLogAggregationReportsForApp(); + Map reports = getLogAggregationReportsForApp(); if (reports.size() == 0) { return this.logAggregationStatusForAppReport; } @@ -1709,31 +1716,32 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { int logRunningWithFailure = 0; for (Entry report : reports.entrySet()) { switch (report.getValue().getLogAggregationStatus()) { - case NOT_START: - logNotStartCount++; - break; - case RUNNING_WITH_FAILURE: - logRunningWithFailure ++; - break; - case SUCCEEDED: - logCompletedCount++; - break; - case FAILED: - logFailedCount++; - logCompletedCount++; - break; - case TIME_OUT: - logTimeOutCount++; - logCompletedCount++; - break; - default: - break; + case NOT_START: + logNotStartCount++; + break; + case RUNNING_WITH_FAILURE: + logRunningWithFailure++; + break; + case SUCCEEDED: + logCompletedCount++; + break; + case FAILED: + logFailedCount++; + logCompletedCount++; + break; + case TIME_OUT: + logTimeOutCount++; + logCompletedCount++; + break; + default: + break; } } if (logNotStartCount == reports.size()) { return LogAggregationStatus.NOT_START; } else if (logCompletedCount == reports.size()) { - // We should satisfy two condition in order to return SUCCEEDED or FAILED + // We should satisfy two condition in order to return SUCCEEDED or + // FAILED // 1) make sure the application is in final state // 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT // The SUCCEEDED/FAILED status is the final status which means @@ -1758,9 +1766,9 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() { private boolean isLogAggregationFinished() { return this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.SUCCEEDED) + .equals(LogAggregationStatus.SUCCEEDED) || this.logAggregationStatusForAppReport - .equals(LogAggregationStatus.FAILED); + .equals(LogAggregationStatus.FAILED); } @@ -1773,31 +1781,28 @@ private void updateLogAggregationDiagnosticMessages(NodeId nodeId, LogAggregationReport report) { if (report.getDiagnosticMessage() != null && !report.getDiagnosticMessage().isEmpty()) { - if (report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING ) { + if (report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) { List diagnostics = logAggregationDiagnosticsForNMs.get(nodeId); if (diagnostics == null) { diagnostics = new ArrayList(); logAggregationDiagnosticsForNMs.put(nodeId, diagnostics); } else { - if (diagnostics.size() - == maxLogAggregationDiagnosticsInMemory) { + if (diagnostics.size() == maxLogAggregationDiagnosticsInMemory) { diagnostics.remove(0); } } diagnostics.add(report.getDiagnosticMessage()); - this.logAggregationStatus.get(nodeId).setDiagnosticMessage( - StringUtils.join(diagnostics, "\n")); - } else if (report.getLogAggregationStatus() - == LogAggregationStatus.RUNNING_WITH_FAILURE) { - List failureMessages = - logAggregationFailureMessagesForNMs.get(nodeId); + this.logAggregationStatus.get(nodeId) + .setDiagnosticMessage(StringUtils.join(diagnostics, "\n")); + } else if (report + .getLogAggregationStatus() == LogAggregationStatus.RUNNING_WITH_FAILURE) { + List failureMessages = logAggregationFailureMessagesForNMs + .get(nodeId); if (failureMessages == null) { failureMessages = new ArrayList(); logAggregationFailureMessagesForNMs.put(nodeId, failureMessages); } else { - if (failureMessages.size() - == maxLogAggregationDiagnosticsInMemory) { + if (failureMessages.size() == maxLogAggregationDiagnosticsInMemory) { failureMessages.remove(0); } } @@ -1807,35 +1812,34 @@ private void updateLogAggregationDiagnosticMessages(NodeId nodeId, } private void updateLogAggregationStatus(NodeId nodeId) { - LogAggregationStatus status = - this.logAggregationStatus.get(nodeId).getLogAggregationStatus(); + LogAggregationStatus status = this.logAggregationStatus.get(nodeId) + .getLogAggregationStatus(); if (status.equals(LogAggregationStatus.SUCCEEDED)) { this.logAggregationSucceed++; } else if (status.equals(LogAggregationStatus.FAILED)) { this.logAggregationFailed++; } if (this.logAggregationSucceed == this.logAggregationStatus.size()) { - this.logAggregationStatusForAppReport = - LogAggregationStatus.SUCCEEDED; + this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED; // Since the log aggregation status for this application for all NMs // is SUCCEEDED, it means all logs are aggregated successfully. // We could remove all the cached log aggregation reports this.logAggregationStatus.clear(); this.logAggregationDiagnosticsForNMs.clear(); this.logAggregationFailureMessagesForNMs.clear(); - } else if (this.logAggregationSucceed + this.logAggregationFailed - == this.logAggregationStatus.size()) { + } else if (this.logAggregationSucceed + + this.logAggregationFailed == this.logAggregationStatus.size()) { this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED; // We have collected the log aggregation status for all NMs. // The log aggregation status is FAILED which means the log // aggregation fails in some NMs. We are only interested in the // nodes where the log aggregation is failed. So we could remove // the log aggregation details for those succeeded NMs - for (Iterator> it = - this.logAggregationStatus.entrySet().iterator(); it.hasNext();) { + for (Iterator> it = this.logAggregationStatus + .entrySet().iterator(); it.hasNext();) { Map.Entry entry = it.next(); if (entry.getValue().getLogAggregationStatus() - .equals(LogAggregationStatus.SUCCEEDED)) { + .equals(LogAggregationStatus.SUCCEEDED)) { it.remove(); } } @@ -1848,8 +1852,8 @@ private void updateLogAggregationStatus(NodeId nodeId) { public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { try { this.readLock.lock(); - List failureMessages = - this.logAggregationFailureMessagesForNMs.get(nodeId); + List failureMessages = this.logAggregationFailureMessagesForNMs + .get(nodeId); if (failureMessages == null || failureMessages.isEmpty()) { return StringUtils.EMPTY; } @@ -1861,8 +1865,8 @@ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { @Override public String getAppNodeLabelExpression() { - String appNodeLabelExpression = - getApplicationSubmissionContext().getNodeLabelExpression(); + String appNodeLabelExpression = getApplicationSubmissionContext() + .getNodeLabelExpression(); appNodeLabelExpression = (appNodeLabelExpression == null) ? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression; appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty()) diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java new file mode 100644 index 0000000..159585e --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import java.net.InetAddress; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +/** + * An event class that is used to help with logging information + * when an application KILL event is needed. + * + */ +public class RMAppKillByClientEvent extends RMAppEvent { + + private final UserGroupInformation callerUGI; + private final InetAddress ip; + + /** + * constructor to create an event used for logging during user driven kill + * invocations. + * + * @param appId application id + * @param diagnostics message about the kill event + * @param callerUGI caller's user & group information + * @param remoteIP ip address of the caller + */ + public RMAppKillByClientEvent(ApplicationId appId, String diagnostics, + UserGroupInformation callerUGI, InetAddress remoteIP) { + super(appId, RMAppEventType.KILL, diagnostics); + this.callerUGI = callerUGI; + this.ip = remoteIP; + } + + /** + * returns the {@link UserGroupInformation} information. + * @return UserGroupInformation + */ + public final UserGroupInformation getCallerUGI() { + return callerUGI; + } + + /** + * returns the ip address stored in this event. + * @return remoteIP + */ + public final InetAddress getIp() { + return ip; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java index acb8e37..3311f92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java @@ -23,6 +23,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.UnknownHostException; import com.google.protobuf.BlockingService; import com.google.protobuf.RpcController; @@ -109,18 +110,37 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId, null); } + private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId, + ApplicationAttemptId attemptId, ContainerId containerId, + CallerContext callerContext, Resource resource) { + testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId, + callerContext, resource, Server.getRemoteIp()); + } + /** * Test the AuditLog format for successful events. */ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId, ApplicationAttemptId attemptId, ContainerId containerId, - CallerContext callerContext, Resource resource) { - String sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, - appId, attemptId, containerId, resource, callerContext); + CallerContext callerContext, Resource resource, InetAddress remoteIp) { + + String sLog; + if (checkIP) { + sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId, + attemptId, containerId, resource, callerContext, remoteIp); + } else { + sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId, + attemptId, containerId, resource, callerContext, null); + } StringBuilder expLog = new StringBuilder(); expLog.append("USER=test\t"); if (checkIP) { - InetAddress ip = Server.getRemoteIp(); + InetAddress ip; + if(remoteIp != null) { + ip = remoteIp; + } else { + ip = Server.getRemoteIp(); + } expLog.append(Keys.IP.name() + "=" + ip.getHostAddress() + "\t"); } expLog.append("OPERATION=oper\tTARGET=tgt\tRESULT=SUCCESS"); @@ -148,6 +168,13 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId, assertEquals(expLog.toString(), sLog); } + private void testSuccessLogFormatHelperWithIP(boolean checkIP, + ApplicationId appId, ApplicationAttemptId attemptId, + ContainerId containerId, InetAddress ip) { + testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId, null, + null, ip); + } + /** * Test the AuditLog format for successful events passing nulls. */ @@ -165,6 +192,33 @@ private void testSuccessLogNulls(boolean checkIP) { } /** + * Tests the SuccessLog with two IP addresses. + * + * @param checkIP + * @param appId + * @param attemptId + * @param containerId + */ + private void testSuccessLogFormatHelperWithIP(boolean checkIP, + ApplicationId appId, ApplicationAttemptId attemptId, + ContainerId containerId) { + testSuccessLogFormatHelperWithIP(checkIP, appId, attemptId, containerId, + InetAddress.getLoopbackAddress()); + byte[] ipAddr = new byte[] {100, 10, 10, 1}; + + InetAddress addr = null; + try { + addr = InetAddress.getByAddress(ipAddr); + } catch (UnknownHostException uhe) { + // should not happen as long as IP address format + // stays the same + Assert.fail("Check ip address being constructed"); + } + testSuccessLogFormatHelperWithIP(checkIP, appId, attemptId, containerId, + addr); + } + + /** * Test the AuditLog format for successful events with the various * parameters. */ @@ -187,6 +241,7 @@ private void testSuccessLogFormat(boolean checkIP) { testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID, new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE) .build(), RESOURCE); + testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID); testSuccessLogNulls(checkIP); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 9131643..dcc5631 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -257,7 +258,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext) ApplicationMasterService masterService = new ApplicationMasterService(rmContext, scheduler); - + if(submissionContext == null) { submissionContext = new ApplicationSubmissionContextPBImpl(); } @@ -305,7 +306,7 @@ private static void testAppStartState(ApplicationId applicationId, private static void assertStartTimeSet(RMApp application) { Assert.assertTrue("application start time is not greater than 0", application.getStartTime() > 0); - Assert.assertTrue("application start time is before currentTime", + Assert.assertTrue("application start time is before currentTime", application.getStartTime() <= System.currentTimeMillis()); } @@ -324,7 +325,7 @@ private void assertTimesAtFinish(RMApp application) { assertStartTimeSet(application); Assert.assertTrue("application finish time is not greater than 0", (application.getFinishTime() > 0)); - Assert.assertTrue("application finish time is not >= than start time", + Assert.assertTrue("application finish time is not >= start time", (application.getFinishTime() >= application.getStartTime())); } @@ -549,11 +550,14 @@ public void testAppRecoverPath() throws IOException { public void testAppNewKill() throws IOException { LOG.info("--- START: testAppNewKill ---"); + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppNewKill", new String[] {"foo_group"}); + RMApp application = createNewTestApp(null); // NEW => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -604,9 +608,13 @@ public void testAppNewSavingKill() throws IOException { RMApp application = testCreateAppNewSaving(null); // NEW_SAVING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppNewSavingKill", new String[] {"foo_group"}); + + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); + application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -653,10 +661,15 @@ public void testAppSubmittedRejected() throws IOException { public void testAppSubmittedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppSubmittedKill---"); RMApp application = testCreateAppSubmittedNoRecovery(null); + + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppSubmittedKill", new String[] {"foo_group"}); + // SUBMITTED => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); + application.handle(event); rmDispatcher.await(); sendAppUpdateSavedEvent(application); @@ -706,9 +719,13 @@ public void testAppAcceptedKill() throws IOException, InterruptedException { LOG.info("--- START: testAppAcceptedKill ---"); RMApp application = testCreateAppAccepted(null); // ACCEPTED => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppAcceptedKill", new String[] {"foo_group"}); + + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); + application.handle(event); rmDispatcher.await(); @@ -754,9 +771,14 @@ public void testAppRunningKill() throws IOException { RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppRunningKill", new String[] {"foo_group"}); + + // SUBMITTED => KILLED event RMAppEventType.KILL + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); + application.handle(event); rmDispatcher.await(); @@ -920,9 +942,14 @@ public void testAppKilledKilled() throws IOException { RMApp application = testCreateAppRunning(null); // RUNNING => KILLED event RMAppEventType.KILL - RMAppEvent event = - new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL, - "Application killed by user."); + UserGroupInformation fooUser = UserGroupInformation.createUserForTesting( + "fooTestAppRunningKill", new String[] {"foo_group"}); + + // SUBMITTED => KILLED event RMAppEventType.KILL + RMAppEvent event = new RMAppKillByClientEvent( + application.getApplicationId(), "Application killed by user.", fooUser, + Server.getRemoteIp()); + application.handle(event); rmDispatcher.await(); sendAttemptUpdateSavedEvent(application);