diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 81d63db..70e1863 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.AccessControlException;
import java.text.MessageFormat;
@@ -147,8 +148,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemUtil;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppKillByClientEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -772,20 +772,18 @@ public KillApplicationResponse forceKillApplication(
}
if (application.isAppFinalStateStored()) {
- RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
- AuditConstants.KILL_APP_REQUEST, "ClientRMService", applicationId,
- callerContext);
return KillApplicationResponse.newInstance(true);
}
- String message = "Kill application " + applicationId +
- " received from " + callerUGI;
- if(null != Server.getRemoteAddress()) {
- message += " at " + Server.getRemoteAddress();
+ String message = "Kill application " + applicationId + " received from "
+ + callerUGI;
+ InetAddress remoteAddress = Server.getRemoteIp();
+ if (null != remoteAddress) {
+ message += " at " + remoteAddress.getHostAddress();
}
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppEvent(applicationId, RMAppEventType.KILL,
- message));
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppKillByClientEvent(applicationId, message, callerUGI,
+ remoteAddress));
// For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
index c5bf000..84c0390 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java
@@ -77,12 +77,12 @@
public static final String LIST_RESERVATION_REQUEST = "List " +
"Reservation Request";
}
-
+
static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId,
ContainerId containerId, Resource resource) {
return createSuccessLog(user, operation, target, appId, attemptId,
- containerId, resource, null);
+ containerId, resource, null, Server.getRemoteIp());
}
/**
@@ -90,10 +90,13 @@ static String createSuccessLog(String user, String operation, String target,
*/
static String createSuccessLog(String user, String operation, String target,
ApplicationId appId, ApplicationAttemptId attemptId,
- ContainerId containerId, Resource resource, CallerContext callerContext) {
+ ContainerId containerId, Resource resource, CallerContext callerContext,
+ InetAddress ip) {
StringBuilder b = new StringBuilder();
start(Keys.USER, user, b);
- addRemoteIP(b);
+ if (ip != null) {
+ add(Keys.IP, ip.getHostAddress(), b);
+ }
add(Keys.OPERATION, operation, b);
add(Keys.TARGET, target ,b);
add(Keys.RESULT, AuditConstants.SUCCESS, b);
@@ -183,10 +186,37 @@ public static void logSuccess(String user, String operation, String target,
ApplicationId appId, CallerContext callerContext) {
if (LOG.isInfoEnabled()) {
LOG.info(createSuccessLog(user, operation, target, appId, null, null,
- null, callerContext));
+ null, callerContext, Server.getRemoteIp()));
}
}
+ /**
+ * Create a readable and parseable audit log string for a successful event.
+ *
+ * @param user
+ * User who made the service request to the ResourceManager.
+ * @param operation
+ * Operation requested by the user.
+ * @param target
+ * The target on which the operation is being performed.
+ * @param appId
+ * Application Id in which operation was performed.
+ * @param ip
+ * The ip address of the caller.
+ *
+ *
+ *
+ * Note that the {@link RMAuditLogger} uses tabs ('\t') as a key-val
+ * delimiter and hence the value fields should not contains tabs
+ * ('\t').
+ */
+ public static void logSuccess(String user, String operation, String target,
+ ApplicationId appId, InetAddress ip) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info(createSuccessLog(user, operation, target, appId, null, null,
+ null, null, ip));
+ }
+ }
/**
* Create a readable and parseable audit log string for a successful event.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 45ff79c..a178dbc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
@@ -75,6 +76,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
@@ -132,8 +135,7 @@
private final int maxAppAttempts;
private final ReadLock readLock;
private final WriteLock writeLock;
- private final Map attempts
- = new LinkedHashMap();
+ private final Map attempts = new LinkedHashMap();
private final long submitTime;
private final Set updatedNodes = new HashSet();
private final String applicationType;
@@ -158,22 +160,18 @@
private volatile RMAppAttempt currentAttempt;
private String queue;
private EventHandler handler;
- private static final AppFinishedTransition FINISHED_TRANSITION =
- new AppFinishedTransition();
+ private static final AppFinishedTransition FINISHED_TRANSITION = new AppFinishedTransition();
private Set ranNodes = new ConcurrentSkipListSet();
private final boolean logAggregationEnabled;
private long logAggregationStartTime = 0;
private final long logAggregationStatusTimeout;
- private final Map logAggregationStatus =
- new HashMap();
+ private final Map logAggregationStatus = new HashMap();
private LogAggregationStatus logAggregationStatusForAppReport;
private int logAggregationSucceed = 0;
private int logAggregationFailed = 0;
- private Map> logAggregationDiagnosticsForNMs =
- new HashMap>();
- private Map> logAggregationFailureMessagesForNMs =
- new HashMap>();
+ private Map> logAggregationDiagnosticsForNMs = new HashMap>();
+ private Map> logAggregationFailureMessagesForNMs = new HashMap>();
private final int maxLogAggregationDiagnosticsInMemory;
// These states stored are only valid when app is at killing or final_saving.
@@ -183,230 +181,232 @@
private RMAppState targetedFinalState;
private RMAppState recoveredFinalState;
private ResourceRequest amReq;
-
+
private CallerContext callerContext;
Object transitionTodo;
- private static final StateMachineFactory stateMachineFactory
- = new StateMachineFactory(RMAppState.NEW)
-
-
- // Transitions from NEW state
- .addTransition(RMAppState.NEW, RMAppState.NEW,
- RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.NEW, RMAppState.NEW,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
- RMAppEventType.START, new RMAppNewlySavingTransition())
- .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
- RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
- RMAppState.KILLED, RMAppState.FINAL_SAVING),
- RMAppEventType.RECOVER, new RMAppRecoveredTransition())
- .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
- new AppKilledTransition())
- .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
- RMAppEventType.APP_REJECTED,
- new FinalSavingTransition(new AppRejectedTransition(),
- RMAppState.FAILED))
-
- // Transitions from NEW_SAVING state
- .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
- RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
- .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
- .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.APP_REJECTED,
- new FinalSavingTransition(new AppRejectedTransition(),
- RMAppState.FAILED))
- .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
- RMAppEventType.MOVE, new RMAppMoveTransition())
-
- // Transitions from SUBMITTED state
- .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.MOVE, new RMAppMoveTransition())
- .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
- RMAppEventType.APP_REJECTED,
- new FinalSavingTransition(
- new AppRejectedTransition(), RMAppState.FAILED))
- .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
- .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
- RMAppEventType.KILL,
- new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
-
- // Transitions from ACCEPTED state
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.MOVE, new RMAppMoveTransition())
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
- RMAppEventType.ATTEMPT_REGISTERED, new RMAppStateUpdateTransition(
- YarnApplicationState.RUNNING))
- .addTransition(RMAppState.ACCEPTED,
- EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
- // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
- // event because RMAppRecoveredTransition is returning ACCEPTED state
- // directly and waiting for the previous AM to exit.
- RMAppEventType.ATTEMPT_FAILED,
- new AttemptFailedTransition(RMAppState.ACCEPTED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_FINISHED,
- new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
- RMAppEventType.KILL, new KillAttemptTransition())
- .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_KILLED,
- new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED))
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
-
- // Transitions from RUNNING state
- .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.MOVE, new RMAppMoveTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_UNREGISTERED,
- new FinalSavingTransition(
- new AttemptUnregisteredTransition(),
- RMAppState.FINISHING, RMAppState.FINISHED))
- .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
- // UnManagedAM directly jumps to finished
- RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.RUNNING,
- EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
- RMAppEventType.ATTEMPT_FAILED,
- new AttemptFailedTransition(RMAppState.ACCEPTED))
- .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
- RMAppEventType.KILL, new KillAttemptTransition())
-
- // Transitions from FINAL_SAVING state
- .addTransition(RMAppState.FINAL_SAVING,
- EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
- RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
- new FinalStateSavedTransition())
- .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_FINISHED,
- new AttemptFinishedAtFinalSavingTransition())
- .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- // ignorable transitions
- .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
- EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
- RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
-
- // Transitions from FINISHING state
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
- RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- // ignorable transitions
- .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
- EnumSet.of(RMAppEventType.NODE_UPDATE,
- // ignore Kill/Move as we have already saved the final Finished state
- // in state store.
- RMAppEventType.KILL, RMAppEventType.MOVE))
-
- // Transitions from KILLING state
- .addTransition(RMAppState.KILLING, RMAppState.KILLING,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.KILLING, RMAppState.KILLING,
- RMAppEventType.COLLECTOR_UPDATE, new RMAppCollectorUpdateTransition())
- .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_KILLED,
- new FinalSavingTransition(
- new AppKilledTransition(), RMAppState.KILLED))
- .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
- RMAppEventType.ATTEMPT_UNREGISTERED,
- new FinalSavingTransition(
- new AttemptUnregisteredTransition(),
- RMAppState.FINISHING, RMAppState.FINISHED))
- .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
- // UnManagedAM directly jumps to finished
- RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
- .addTransition(RMAppState.KILLING,
- EnumSet.of(RMAppState.FINAL_SAVING),
- RMAppEventType.ATTEMPT_FAILED,
- new AttemptFailedTransition(RMAppState.KILLING))
-
- .addTransition(RMAppState.KILLING, RMAppState.KILLING,
- EnumSet.of(
- RMAppEventType.NODE_UPDATE,
- RMAppEventType.ATTEMPT_REGISTERED,
- RMAppEventType.APP_UPDATE_SAVED,
- RMAppEventType.KILL, RMAppEventType.MOVE))
-
- // Transitions from FINISHED state
- // ignorable transitions
- .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
- EnumSet.of(
- RMAppEventType.NODE_UPDATE,
- RMAppEventType.ATTEMPT_UNREGISTERED,
- RMAppEventType.ATTEMPT_FINISHED,
- RMAppEventType.KILL, RMAppEventType.MOVE))
-
- // Transitions from FAILED state
- // ignorable transitions
- .addTransition(RMAppState.FAILED, RMAppState.FAILED,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(RMAppState.FAILED, RMAppState.FAILED,
- EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
- RMAppEventType.MOVE))
-
- // Transitions from KILLED state
- // ignorable transitions
- .addTransition(RMAppState.KILLED, RMAppState.KILLED,
- RMAppEventType.APP_RUNNING_ON_NODE,
- new AppRunningOnNodeTransition())
- .addTransition(
- RMAppState.KILLED,
- RMAppState.KILLED,
- EnumSet.of(RMAppEventType.APP_ACCEPTED,
- RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
- RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
- RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))
-
- .installTopology();
-
- private final StateMachine
- stateMachine;
+ private static final StateMachineFactory stateMachineFactory = new StateMachineFactory(
+ RMAppState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(RMAppState.NEW, RMAppState.NEW,
+ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.NEW, RMAppState.NEW,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
+ RMAppEventType.START, new RMAppNewlySavingTransition())
+ .addTransition(RMAppState.NEW,
+ EnumSet.of(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
+ RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
+ RMAppState.FINAL_SAVING),
+ RMAppEventType.RECOVER, new RMAppRecoveredTransition())
+ .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
+ new AppKilledTransition())
+ .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppState.FAILED))
+
+ // Transitions from NEW_SAVING state
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
+ RMAppEventType.APP_NEW_SAVED,
+ new AddApplicationToSchedulerTransition())
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(new AppKilledTransition(),
+ RMAppState.KILLED))
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppState.FAILED))
+ .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
+
+ // Transitions from SUBMITTED state
+ .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_REJECTED,
+ new FinalSavingTransition(new AppRejectedTransition(),
+ RMAppState.FAILED))
+ .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
+ .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.KILL,
+ new FinalSavingTransition(new AppKilledTransition(),
+ RMAppState.KILLED))
+
+ // Transitions from ACCEPTED state
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
+ RMAppEventType.ATTEMPT_REGISTERED,
+ new RMAppStateUpdateTransition(YarnApplicationState.RUNNING))
+ .addTransition(RMAppState.ACCEPTED,
+ EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
+ // ACCEPTED state is possible to receive
+ // ATTEMPT_FAILED/ATTEMPT_FINISHED
+ // event because RMAppRecoveredTransition is returning ACCEPTED
+ // state
+ // directly and waiting for the previous AM to exit.
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.ACCEPTED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_FINISHED,
+ new FinalSavingTransition(FINISHED_TRANSITION,
+ RMAppState.FINISHED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
+ RMAppEventType.KILL, new KillAttemptTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_KILLED,
+ new FinalSavingTransition(new AppKilledTransition(),
+ RMAppState.KILLED))
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+
+ // Transitions from RUNNING state
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.MOVE, new RMAppMoveTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ new FinalSavingTransition(new AttemptUnregisteredTransition(),
+ RMAppState.FINISHING, RMAppState.FINISHED))
+ .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
+ // UnManagedAM directly jumps to finished
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.RUNNING,
+ EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.ACCEPTED))
+ .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
+ RMAppEventType.KILL, new KillAttemptTransition())
+
+ // Transitions from FINAL_SAVING state
+ .addTransition(RMAppState.FINAL_SAVING,
+ EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINISHED),
+ RMAppEventType.APP_UPDATE_SAVED, new FinalStateSavedTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_FINISHED,
+ new AttemptFinishedAtFinalSavingTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ // ignorable transitions
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
+ RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))
+
+ // Transitions from FINISHING state
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ // ignorable transitions
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ EnumSet.of(RMAppEventType.NODE_UPDATE,
+ // ignore Kill/Move as we have already saved the final
+ // Finished state
+ // in state store.
+ RMAppEventType.KILL, RMAppEventType.MOVE))
+
+ // Transitions from KILLING state
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.COLLECTOR_UPDATE,
+ new RMAppCollectorUpdateTransition())
+ .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_KILLED,
+ new FinalSavingTransition(new AppKilledTransition(),
+ RMAppState.KILLED))
+ .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ new FinalSavingTransition(new AttemptUnregisteredTransition(),
+ RMAppState.FINISHING, RMAppState.FINISHED))
+ .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
+ // UnManagedAM directly jumps to finished
+ RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.KILLING,
+ EnumSet.of(RMAppState.FINAL_SAVING),
+ RMAppEventType.ATTEMPT_FAILED,
+ new AttemptFailedTransition(RMAppState.KILLING))
+
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ EnumSet.of(RMAppEventType.NODE_UPDATE,
+ RMAppEventType.ATTEMPT_REGISTERED,
+ RMAppEventType.APP_UPDATE_SAVED, RMAppEventType.KILL,
+ RMAppEventType.MOVE))
+
+ // Transitions from FINISHED state
+ // ignorable transitions
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ EnumSet.of(RMAppEventType.NODE_UPDATE,
+ RMAppEventType.ATTEMPT_UNREGISTERED,
+ RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.KILL,
+ RMAppEventType.MOVE))
+
+ // Transitions from FAILED state
+ // ignorable transitions
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
+ RMAppEventType.MOVE))
+
+ // Transitions from KILLED state
+ // ignorable transitions
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ EnumSet.of(RMAppEventType.APP_ACCEPTED,
+ RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
+ RMAppEventType.ATTEMPT_FINISHED,
+ RMAppEventType.ATTEMPT_FAILED, RMAppEventType.NODE_UPDATE,
+ RMAppEventType.MOVE))
+
+ .installTopology();
+
+ private final StateMachine stateMachine;
private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
private static final float MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE = 0.0f;
@@ -419,8 +419,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
String applicationType, Set applicationTags,
ResourceRequest amReq) {
this(applicationId, rmContext, config, name, user, queue, submissionContext,
- scheduler, masterService, submitTime, applicationType, applicationTags,
- amReq, -1);
+ scheduler, masterService, submitTime, applicationType, applicationTags,
+ amReq, -1);
}
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
@@ -456,8 +456,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
int individualMaxAppAttempts = submissionContext.getMaxAppAttempts();
- if (individualMaxAppAttempts <= 0 ||
- individualMaxAppAttempts > globalMaxAppAttempts) {
+ if (individualMaxAppAttempts <= 0
+ || individualMaxAppAttempts > globalMaxAppAttempts) {
this.maxAppAttempts = globalMaxAppAttempts;
LOG.warn("The specific max attempts: " + individualMaxAppAttempts
+ " for application: " + applicationId.getId()
@@ -467,8 +467,8 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.maxAppAttempts = individualMaxAppAttempts;
}
- this.attemptFailuresValidityInterval =
- submissionContext.getAttemptFailuresValidityInterval();
+ this.attemptFailuresValidityInterval = submissionContext
+ .getAttemptFailuresValidityInterval();
if (this.attemptFailuresValidityInterval > 0) {
LOG.info("The attemptFailuresValidityInterval for the application: "
+ this.applicationId + " is " + this.attemptFailuresValidityInterval
@@ -483,18 +483,17 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
this.callerContext = CallerContext.getCurrent();
- long localLogAggregationStatusTimeout =
- conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+ long localLogAggregationStatusTimeout = conf.getLong(
+ YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
if (localLogAggregationStatusTimeout <= 0) {
- this.logAggregationStatusTimeout =
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
+ this.logAggregationStatusTimeout = YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
} else {
this.logAggregationStatusTimeout = localLogAggregationStatusTimeout;
}
- this.logAggregationEnabled =
- conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ this.logAggregationEnabled = conf.getBoolean(
+ YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
if (this.logAggregationEnabled) {
this.logAggregationStatusForAppReport = LogAggregationStatus.NOT_START;
} else {
@@ -506,22 +505,19 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
// amBlacklistingEnabled can be configured globally
// Just use the global values
- amBlacklistingEnabled =
- conf.getBoolean(
- YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
- YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
+ amBlacklistingEnabled = conf.getBoolean(
+ YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
+ YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_ENABLED);
if (amBlacklistingEnabled) {
blacklistDisableThreshold = conf.getFloat(
YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
- YarnConfiguration.
- DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
+ YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD);
// Verify whether blacklistDisableThreshold is valid. And for invalid
// threshold, reset to global level blacklistDisableThreshold
// configured.
- if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE ||
- blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) {
- blacklistDisableThreshold = YarnConfiguration.
- DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
+ if (blacklistDisableThreshold < MINIMUM_AM_BLACKLIST_THRESHOLD_VALUE
+ || blacklistDisableThreshold > MAXIMUM_AM_BLACKLIST_THRESHOLD_VALUE) {
+ blacklistDisableThreshold = YarnConfiguration.DEFAULT_AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD;
}
}
}
@@ -531,10 +527,10 @@ public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
* be used only if the timeline service v.2 is enabled.
*/
public void startTimelineCollector() {
- AppLevelTimelineCollector collector =
- new AppLevelTimelineCollector(applicationId);
- rmContext.getRMTimelineCollectorManager().putIfAbsent(
- applicationId, collector);
+ AppLevelTimelineCollector collector = new AppLevelTimelineCollector(
+ applicationId);
+ rmContext.getRMTimelineCollectorManager().putIfAbsent(applicationId,
+ collector);
}
/**
@@ -549,7 +545,7 @@ public void stopTimelineCollector() {
public ApplicationId getApplicationId() {
return this.applicationId;
}
-
+
@Override
public ApplicationSubmissionContext getApplicationSubmissionContext() {
return this.submissionContext;
@@ -572,7 +568,7 @@ public FinalApplicationStatus getFinalApplicationStatus() {
public RMAppState getState() {
this.readLock.lock();
try {
- return this.stateMachine.getCurrentState();
+ return this.stateMachine.getCurrentState();
} finally {
this.readLock.unlock();
}
@@ -607,7 +603,7 @@ public RMAppAttempt getRMAppAttempt(ApplicationAttemptId appAttemptId) {
public String getQueue() {
return this.queue;
}
-
+
@Override
public void setQueue(String queue) {
this.queue = queue;
@@ -649,8 +645,9 @@ public RMAppAttempt getCurrentAppAttempt() {
}
}
- private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
- switch(state) {
+ private FinalApplicationStatus createFinalApplicationStatus(
+ RMAppState state) {
+ switch (state) {
case NEW:
case NEW_SAVING:
case SUBMITTED:
@@ -658,8 +655,8 @@ private FinalApplicationStatus createFinalApplicationStatus(RMAppState state) {
case RUNNING:
case FINAL_SAVING:
case KILLING:
- return FinalApplicationStatus.UNDEFINED;
- // finished without a proper final state is the same as failed
+ return FinalApplicationStatus.UNDEFINED;
+ // finished without a proper final state is the same as failed
case FINISHING:
case FINISHED:
case FAILED:
@@ -682,7 +679,7 @@ public int pullRMNodeUpdates(Collection updatedNodes) {
this.writeLock.unlock();
}
}
-
+
@Override
public ApplicationReport createAndGetApplicationReport(String clientUserName,
boolean allowAccess) {
@@ -696,8 +693,7 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
String origTrackingUrl = UNAVAILABLE;
LogAggregationStatus logAggregationStatus = null;
int rpcPort = -1;
- ApplicationResourceUsageReport appUsageReport =
- RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+ ApplicationResourceUsageReport appUsageReport = RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
FinalApplicationStatus finishState = getFinalApplicationStatus();
String diags = UNAVAILABLE;
float progress = 0.0f;
@@ -711,8 +707,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
if (UserGroupInformation.isSecurityEnabled()) {
// get a token so the client can communicate with the app attempt
// NOTE: token may be unavailable if the attempt is not running
- Token attemptClientToAMToken =
- this.currentAttempt.createClientToken(clientUserName);
+ Token attemptClientToAMToken = this.currentAttempt
+ .createClientToken(clientUserName);
if (attemptClientToAMToken != null) {
clientToAMToken = BuilderUtils.newClientToAMToken(
attemptClientToAMToken.getIdentifier(),
@@ -727,13 +723,13 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
progress = currentAttempt.getProgress();
logAggregationStatus = this.getLogAggregationStatusForAppReport();
}
- //if the diagnostics is not already set get it from attempt
+ // if the diagnostics is not already set get it from attempt
diags = getDiagnostics().toString();
- if (currentAttempt != null &&
- currentAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
- if (getApplicationSubmissionContext().getUnmanagedAM() &&
- clientUserName != null && getUser().equals(clientUserName)) {
+ if (currentAttempt != null && currentAttempt
+ .getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+ if (getApplicationSubmissionContext().getUnmanagedAM()
+ && clientUserName != null && getUser().equals(clientUserName)) {
Token token = currentAttempt.getAMRMToken();
if (token != null) {
amrmToken = BuilderUtils.newAMRMToken(token.getIdentifier(),
@@ -749,9 +745,8 @@ public ApplicationReport createAndGetApplicationReport(String clientUserName,
}
if (currentApplicationAttemptId == null) {
- currentApplicationAttemptId =
- BuilderUtils.newApplicationAttemptId(this.applicationId,
- DUMMY_APPLICATION_ATTEMPT_NUMBER);
+ currentApplicationAttemptId = BuilderUtils.newApplicationAttemptId(
+ this.applicationId, DUMMY_APPLICATION_ATTEMPT_NUMBER);
}
ApplicationReport report = BuilderUtils.newApplicationReport(
@@ -779,8 +774,8 @@ private String getDefaultProxyTrackingUrl() {
URI result = ProxyUriUtils.getProxyUri(null, proxyUri, applicationId);
return result.toASCIIString();
} catch (URISyntaxException e) {
- LOG.warn("Could not generate default proxy tracking URL for "
- + applicationId);
+ LOG.warn(
+ "Could not generate default proxy tracking URL for " + applicationId);
return UNAVAILABLE;
}
}
@@ -858,8 +853,8 @@ public void handle(RMAppEvent event) {
try {
ApplicationId appID = event.getApplicationId();
- LOG.debug("Processing event for " + appID + " of type "
- + event.getType());
+ LOG.debug(
+ "Processing event for " + appID + " of type " + event.getType());
final RMAppState oldState = getState();
try {
/* keep the master in sync with the state machine */
@@ -870,8 +865,8 @@ public void handle(RMAppEvent event) {
}
if (oldState != getState()) {
- LOG.info(appID + " State change from " + oldState + " to "
- + getState());
+ LOG.info(appID + " State change from " + oldState + " to " + getState()
+ + " on event=" + event.getType());
}
} finally {
this.writeLock.unlock();
@@ -880,14 +875,14 @@ public void handle(RMAppEvent event) {
@Override
public void recover(RMState state) {
- ApplicationStateData appState =
- state.getApplicationState().get(getApplicationId());
+ ApplicationStateData appState = state.getApplicationState()
+ .get(getApplicationId());
this.recoveredFinalState = appState.getState();
- LOG.info("Recovering app: " + getApplicationId() + " with " +
- + appState.getAttemptCount() + " attempts and final state = "
- + this.recoveredFinalState );
- this.diagnostics.append(null == appState.getDiagnostics() ? "" : appState
- .getDiagnostics());
+ LOG.info("Recovering app: " + getApplicationId() + " with "
+ + +appState.getAttemptCount() + " attempts and final state = "
+ + this.recoveredFinalState);
+ this.diagnostics.append(
+ null == appState.getDiagnostics() ? "" : appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
this.callerContext = appState.getCallerContext();
@@ -896,24 +891,24 @@ public void recover(RMState state) {
this.firstAttemptIdInStateStore = appState.getFirstAttemptId();
this.nextAttemptId = firstAttemptIdInStateStore;
}
- //TODO recover collector address.
- //this.collectorAddr = appState.getCollectorAddr();
+ // TODO recover collector address.
+ // this.collectorAddr = appState.getCollectorAddr();
// send the ATS create Event
sendATSCreateEvent(this, this.startTime);
RMAppAttemptImpl preAttempt = null;
- for (ApplicationAttemptId attemptId :
- new TreeSet<>(appState.attempts.keySet())) {
+ for (ApplicationAttemptId attemptId : new TreeSet<>(
+ appState.attempts.keySet())) {
// create attempt
createNewAttempt(attemptId);
- ((RMAppAttemptImpl)this.currentAttempt).recover(state);
+ ((RMAppAttemptImpl) this.currentAttempt).recover(state);
// If previous attempt is not in final state, it means we failed to store
// its final state. We set it to FAILED now because we could not make sure
// about its final state.
if (preAttempt != null && preAttempt.getRecoveredFinalState() == null) {
preAttempt.setRecoveredFinalState(RMAppAttemptState.FAILED);
}
- preAttempt = (RMAppAttemptImpl)currentAttempt;
+ preAttempt = (RMAppAttemptImpl) currentAttempt;
}
if (currentAttempt != null) {
nextAttemptId = currentAttempt.getAppAttemptId().getAttemptId() + 1;
@@ -921,8 +916,8 @@ public void recover(RMState state) {
}
private void createNewAttempt() {
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(applicationId, nextAttemptId++);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId
+ .newInstance(applicationId, nextAttemptId++);
createNewAttempt(appAttemptId);
}
@@ -939,24 +934,23 @@ private void createNewAttempt(ApplicationAttemptId appAttemptId) {
currentAMBlacklistManager = new DisabledBlacklistManager();
}
}
- RMAppAttempt attempt =
- new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
- submissionContext, conf,
- // The newly created attempt maybe last attempt if (number of
- // previously failed attempts(which should not include Preempted,
- // hardware error and NM resync) + 1) equal to the max-attempt
- // limit.
- maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
- currentAMBlacklistManager);
+ RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext,
+ scheduler, masterService, submissionContext, conf,
+ // The newly created attempt maybe last attempt if (number of
+ // previously failed attempts(which should not include Preempted,
+ // hardware error and NM resync) + 1) equal to the max-attempt
+ // limit.
+ maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq,
+ currentAMBlacklistManager);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
-
- private void
- createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
+
+ private void createAndStartNewAttempt(
+ boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
- transferStateFromPreviousAttempt));
+ transferStateFromPreviousAttempt));
}
private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
@@ -966,8 +960,8 @@ private void processNodeUpdate(RMAppNodeUpdateType type, RMNode node) {
+ " with state:" + nodeState);
}
- private static class RMAppTransition implements
- SingleArcTransition {
+ private static class RMAppTransition
+ implements SingleArcTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
};
}
@@ -979,8 +973,7 @@ public void transition(RMAppImpl app, RMAppEvent event) {
if (YarnConfiguration.timelineServiceV2Enabled(app.conf)) {
LOG.info("Updating collector info for app: " + app.getApplicationId());
- RMAppCollectorUpdateEvent appCollectorUpdateEvent =
- (RMAppCollectorUpdateEvent) event;
+ RMAppCollectorUpdateEvent appCollectorUpdateEvent = (RMAppCollectorUpdateEvent) event;
// Update collector address
app.setCollectorAddr(appCollectorUpdateEvent.getAppCollectorAddr());
@@ -1007,40 +1000,40 @@ public RMAppStateUpdateTransition(YarnApplicationState state) {
}
public void transition(RMAppImpl app, RMAppEvent event) {
- app.rmContext.getSystemMetricsPublisher().appStateUpdated(
- app, stateToATS, app.systemClock.getTime());
+ app.rmContext.getSystemMetricsPublisher().appStateUpdated(app, stateToATS,
+ app.systemClock.getTime());
};
}
- private static final class AppRunningOnNodeTransition extends RMAppTransition {
+ private static final class AppRunningOnNodeTransition
+ extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
-
+
// if final state already stored, notify RMNode
if (isAppInFinalState(app)) {
- app.handler.handle(
- new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
- .getApplicationId()));
+ app.handler.handle(new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(),
+ nodeAddedEvent.getApplicationId()));
return;
}
-
+
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
if (!app.logAggregationStatus.containsKey(nodeAddedEvent.getNodeId())) {
app.logAggregationStatus.put(nodeAddedEvent.getNodeId(),
- LogAggregationReport.newInstance(app.applicationId,
- app.logAggregationEnabled ? LogAggregationStatus.NOT_START
- : LogAggregationStatus.DISABLED, ""));
+ LogAggregationReport.newInstance(app.applicationId,
+ app.logAggregationEnabled ? LogAggregationStatus.NOT_START
+ : LogAggregationStatus.DISABLED,
+ ""));
}
};
}
/**
- * Move an app to a new queue.
- * This transition must set the result on the Future in the RMAppMoveEvent,
- * either as an exception for failure or null for success, or the client will
- * be left waiting forever.
+ * Move an app to a new queue. This transition must set the result on the
+ * Future in the RMAppMoveEvent, either as an exception for failure or null
+ * for success, or the client will be left waiting forever.
*/
private static final class RMAppMoveTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -1067,12 +1060,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
private void recoverAppAttempts() {
for (RMAppAttempt attempt : getAppAttempts().values()) {
attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
+ RMAppAttemptEventType.RECOVER));
}
}
- private static final class RMAppRecoveredTransition implements
- MultipleArcTransition {
+ private static final class RMAppRecoveredTransition
+ implements MultipleArcTransition {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
@@ -1105,15 +1098,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
- app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, false));
+ app.scheduler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, false));
return RMAppState.SUBMITTED;
}
// Add application to scheduler synchronously to guarantee scheduler
// knows applications before AM or NM re-registers.
- app.scheduler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, true));
+ app.scheduler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, true));
// recover attempts
app.recoverAppAttempts();
@@ -1125,12 +1118,12 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
}
}
- private static final class AddApplicationToSchedulerTransition extends
- RMAppTransition {
+ private static final class AddApplicationToSchedulerTransition
+ extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
- app.handler.handle(new AppAddedSchedulerEvent(app.user,
- app.submissionContext, false));
+ app.handler.handle(
+ new AppAddedSchedulerEvent(app.user, app.submissionContext, false));
}
}
@@ -1141,25 +1134,25 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
- private static final class FinalStateSavedTransition implements
- MultipleArcTransition {
+ private static final class FinalStateSavedTransition
+ implements MultipleArcTransition {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (app.transitionTodo instanceof SingleArcTransition) {
((SingleArcTransition) app.transitionTodo).transition(app,
- app.eventCausingFinalSaving);
+ app.eventCausingFinalSaving);
} else if (app.transitionTodo instanceof MultipleArcTransition) {
((MultipleArcTransition) app.transitionTodo).transition(app,
- app.eventCausingFinalSaving);
+ app.eventCausingFinalSaving);
}
return app.targetedFinalState;
}
}
- private static class AttemptFailedFinalStateSavedTransition extends
- RMAppTransition {
+ private static class AttemptFailedFinalStateSavedTransition
+ extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
String msg = null;
@@ -1179,27 +1172,27 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
if (this.submissionContext.getUnmanagedAM()) {
// RM does not manage the AM. Do not retry
msg = "Unmanaged application " + this.getApplicationId()
- + " failed due to " + failedEvent.getDiagnosticMsg()
- + ". Failing the application.";
+ + " failed due to " + failedEvent.getDiagnosticMsg()
+ + ". Failing the application.";
} else if (this.isNumAttemptsBeyondThreshold) {
int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
msg = String.format(
- "Application %s failed %d times%s%s due to %s. Failing the application.",
- getApplicationId(),
- maxAppAttempts,
+ "Application %s failed %d times%s%s due to %s. Failing the application.",
+ getApplicationId(), maxAppAttempts,
(attemptFailuresValidityInterval <= 0 ? ""
- : (" in previous " + attemptFailuresValidityInterval
+ : (" in previous " + attemptFailuresValidityInterval
+ " milliseconds")),
(globalLimit == maxAppAttempts) ? ""
- : (" (global limit =" + globalLimit
- + "; local limit is =" + maxAppAttempts + ")"),
+ : (" (global limit =" + globalLimit + "; local limit is ="
+ + maxAppAttempts + ")"),
failedEvent.getDiagnosticMsg());
}
return msg;
}
- private static final class RMAppNewlySavingTransition extends RMAppTransition {
+ private static final class RMAppNewlySavingTransition
+ extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
@@ -1250,10 +1243,9 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
break;
}
- ApplicationStateData appState =
- ApplicationStateData.newInstance(this.submitTime, this.startTime,
- this.user, this.submissionContext,
- stateToBeStored, diags, this.storedFinishTime, this.callerContext);
+ ApplicationStateData appState = ApplicationStateData.newInstance(
+ this.submitTime, this.startTime, this.user, this.submissionContext,
+ stateToBeStored, diags, this.storedFinishTime, this.callerContext);
this.rmContext.getStateStore().updateApplicationState(appState);
}
@@ -1277,7 +1269,7 @@ public FinalSavingTransition(Object transitionToDo,
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
app.rememberTargetTransitionsAndStoreState(event, transitionToDo,
- targetedFinalState, stateToBeStored);
+ targetedFinalState, stateToBeStored);
}
}
@@ -1299,8 +1291,8 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
- private static class AttemptFinishedAtFinalSavingTransition extends
- RMAppTransition {
+ private static class AttemptFinishedAtFinalSavingTransition
+ extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
if (app.targetedFinalState.equals(RMAppState.FAILED)
@@ -1313,18 +1305,20 @@ public void transition(RMAppImpl app, RMAppEvent event) {
// pass in the earlier attempt_unregistered event, as it is needed in
// AppFinishedFinalStateSavedTransition later on
app.rememberTargetTransitions(event,
- new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
- RMAppState.FINISHED);
+ new AppFinishedFinalStateSavedTransition(app.eventCausingFinalSaving),
+ RMAppState.FINISHED);
};
}
- private static class AppFinishedFinalStateSavedTransition extends
- RMAppTransition {
+ private static class AppFinishedFinalStateSavedTransition
+ extends RMAppTransition {
RMAppEvent attemptUnregistered;
- public AppFinishedFinalStateSavedTransition(RMAppEvent attemptUnregistered) {
+ public AppFinishedFinalStateSavedTransition(
+ RMAppEvent attemptUnregistered) {
this.attemptUnregistered = attemptUnregistered;
}
+
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
new AttemptUnregisteredTransition().transition(app, attemptUnregistered);
@@ -1332,6 +1326,24 @@ public void transition(RMAppImpl app, RMAppEvent event) {
};
}
+ /**
+ * Log the audit event for kill by client.
+ *
+ * @param event The {@link RMAppEvent} to be logged
+ */
+ static void auditLogKillEvent(RMAppEvent event) {
+ if (event instanceof RMAppKillByClientEvent) {
+ RMAppKillByClientEvent killEvent = (RMAppKillByClientEvent) event;
+ UserGroupInformation callerUGI = killEvent.getCallerUGI();
+ String userName = null;
+ if (callerUGI != null) {
+ userName = callerUGI.getShortUserName();
+ }
+ InetAddress remoteIP = killEvent.getIp();
+ RMAuditLogger.logSuccess(userName, AuditConstants.KILL_APP_REQUEST,
+ "RMAppImpl", event.getApplicationId(), remoteIP);
+ }
+ }
private static class AppKilledTransition extends FinalTransition {
public AppKilledTransition() {
@@ -1342,6 +1354,7 @@ public AppKilledTransition() {
public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event);
+ RMAppImpl.auditLogKillEvent(event);
};
}
@@ -1352,14 +1365,14 @@ public void transition(RMAppImpl app, RMAppEvent event) {
// Forward app kill diagnostics in the event to kill app attempt.
// These diagnostics will be returned back in ATTEMPT_KILLED event sent by
// RMAppAttemptImpl.
- app.handler.handle(
- new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
+ app.handler
+ .handle(new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
+ RMAppImpl.auditLogKillEvent(event);
}
}
- private static final class AppRejectedTransition extends
- FinalTransition{
+ private static final class AppRejectedTransition extends FinalTransition {
public AppRejectedTransition() {
super(RMAppState.FAILED);
}
@@ -1381,27 +1394,25 @@ public FinalTransition(RMAppState finalState) {
public void transition(RMAppImpl app, RMAppEvent event) {
app.logAggregationStartTime = System.currentTimeMillis();
for (NodeId nodeId : app.getRanNodes()) {
- app.handler.handle(
- new RMNodeCleanAppEvent(nodeId, app.applicationId));
+ app.handler.handle(new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
app.finishTime = app.storedFinishTime;
- if (app.finishTime == 0 ) {
+ if (app.finishTime == 0) {
app.finishTime = app.systemClock.getTime();
}
// Recovered apps that are completed were not added to scheduler, so no
// need to remove them from scheduler.
if (app.recoveredFinalState == null) {
- app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
- finalState));
+ app.handler.handle(
+ new AppRemovedSchedulerEvent(app.applicationId, finalState));
}
- app.handler.handle(
- new RMAppManagerEvent(app.applicationId,
+ app.handler.handle(new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
- app.rmContext.getRMApplicationHistoryWriter()
- .applicationFinished(app, finalState);
- app.rmContext.getSystemMetricsPublisher()
- .appFinished(app, finalState, app.finishTime);
+ app.rmContext.getRMApplicationHistoryWriter().applicationFinished(app,
+ finalState);
+ app.rmContext.getSystemMetricsPublisher().appFinished(app, finalState,
+ app.finishTime);
};
}
@@ -1422,8 +1433,8 @@ private int getNumFailedAppAttempts() {
return completedAttempts;
}
- private static final class AttemptFailedTransition implements
- MultipleArcTransition {
+ private static final class AttemptFailedTransition
+ implements MultipleArcTransition {
private final RMAppState initialState;
@@ -1450,14 +1461,14 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// If this is not last attempt, app should be killed instead of
// launching a new attempt
app.rememberTargetTransitionsAndStoreState(event,
- new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
+ new AppKilledTransition(), RMAppState.KILLED, RMAppState.KILLED);
return RMAppState.FINAL_SAVING;
}
boolean transferStateFromPreviousAttempt;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
- transferStateFromPreviousAttempt =
- failedEvent.getTransferStateFromPreviousAttempt();
+ transferStateFromPreviousAttempt = failedEvent
+ .getTransferStateFromPreviousAttempt();
RMAppAttempt oldAttempt = app.currentAttempt;
app.createAndStartNewAttempt(transferStateFromPreviousAttempt);
@@ -1468,15 +1479,15 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// finished containers so that they can be acked to NM,
// but when pulling finished container we will check this flag again.
((RMAppAttemptImpl) app.currentAttempt)
- .transferStateFromAttempt(oldAttempt);
+ .transferStateFromAttempt(oldAttempt);
return initialState;
} else {
if (numberOfFailure >= app.maxAppAttempts) {
app.isNumAttemptsBeyondThreshold = true;
}
app.rememberTargetTransitionsAndStoreState(event,
- new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
- RMAppState.FAILED);
+ new AttemptFailedFinalStateSavedTransition(), RMAppState.FAILED,
+ RMAppState.FAILED);
return RMAppState.FINAL_SAVING;
}
}
@@ -1532,7 +1543,7 @@ public YarnApplicationState createApplicationState() {
}
return RMServerUtils.createApplicationState(rmAppState);
}
-
+
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = ((RMAppImpl) rmApp).getRecoveredFinalState();
if (appState == null) {
@@ -1541,7 +1552,7 @@ public static boolean isAppInFinalState(RMApp rmApp) {
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
-
+
public RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
@@ -1550,7 +1561,7 @@ public RMAppState getRecoveredFinalState() {
public Set getRanNodes() {
return ranNodes;
}
-
+
@Override
public RMAppMetrics getRMAppMetrics() {
Resource resourcePreempted = Resource.newInstance(0, 0);
@@ -1560,25 +1571,23 @@ public RMAppMetrics getRMAppMetrics() {
long vcoreSeconds = 0;
for (RMAppAttempt attempt : attempts.values()) {
if (null != attempt) {
- RMAppAttemptMetrics attemptMetrics =
- attempt.getRMAppAttemptMetrics();
+ RMAppAttemptMetrics attemptMetrics = attempt.getRMAppAttemptMetrics();
Resources.addTo(resourcePreempted,
attemptMetrics.getResourcePreempted());
numAMContainerPreempted += attemptMetrics.getIsPreempted() ? 1 : 0;
- numNonAMContainerPreempted +=
- attemptMetrics.getNumNonAMContainersPreempted();
+ numNonAMContainerPreempted += attemptMetrics
+ .getNumNonAMContainersPreempted();
// getAggregateAppResourceUsage() will calculate resource usage stats
// for both running and finished containers.
- AggregateAppResourceUsage resUsage =
- attempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
+ AggregateAppResourceUsage resUsage = attempt.getRMAppAttemptMetrics()
+ .getAggregateAppResourceUsage();
memorySeconds += resUsage.getMemorySeconds();
vcoreSeconds += resUsage.getVcoreSeconds();
}
}
- return new RMAppMetrics(resourcePreempted,
- numNonAMContainerPreempted, numAMContainerPreempted,
- memorySeconds, vcoreSeconds);
+ return new RMAppMetrics(resourcePreempted, numNonAMContainerPreempted,
+ numAMContainerPreempted, memorySeconds, vcoreSeconds);
}
@Private
@@ -1591,10 +1600,10 @@ public void setSystemClock(Clock clock) {
public ReservationId getReservationId() {
return submissionContext.getReservationID();
}
-
+
@Override
public ResourceRequest getAMResourceRequest() {
- return this.amReq;
+ return this.amReq;
}
protected Credentials parseCredentials() throws IOException {
@@ -1613,22 +1622,21 @@ protected Credentials parseCredentials() throws IOException {
public Map getLogAggregationReportsForApp() {
try {
this.readLock.lock();
- Map outputs =
- new HashMap();
+ Map outputs = new HashMap();
outputs.putAll(logAggregationStatus);
if (!isLogAggregationFinished()) {
for (Entry output : outputs.entrySet()) {
if (!output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.TIME_OUT)
+ .equals(LogAggregationStatus.TIME_OUT)
&& !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.SUCCEEDED)
+ .equals(LogAggregationStatus.SUCCEEDED)
&& !output.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.FAILED)
+ .equals(LogAggregationStatus.FAILED)
&& isAppInFinalState(this)
&& System.currentTimeMillis() > this.logAggregationStartTime
+ this.logAggregationStatusTimeout) {
- output.getValue().setLogAggregationStatus(
- LogAggregationStatus.TIME_OUT);
+ output.getValue()
+ .setLogAggregationStatus(LogAggregationStatus.TIME_OUT);
}
}
}
@@ -1656,25 +1664,25 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
}
}
if (report.getLogAggregationStatus() != LogAggregationStatus.RUNNING
- || curReport.getLogAggregationStatus() !=
- LogAggregationStatus.RUNNING_WITH_FAILURE) {
- if (curReport.getLogAggregationStatus()
- == LogAggregationStatus.TIME_OUT
- && report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING) {
- // If the log aggregation status got from latest nm heartbeat
- // is Running, and current log aggregation status is TimeOut,
- // based on whether there are any failure messages for this NM,
- // we will reset the log aggregation status as RUNNING or
- // RUNNING_WITH_FAILURE
- if (logAggregationFailureMessagesForNMs.get(nodeId) != null &&
- !logAggregationFailureMessagesForNMs.get(nodeId).isEmpty()) {
+ || curReport
+ .getLogAggregationStatus() != LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ if (curReport
+ .getLogAggregationStatus() == LogAggregationStatus.TIME_OUT
+ && report
+ .getLogAggregationStatus() == LogAggregationStatus.RUNNING) {
+ // If the log aggregation status got from latest nm heartbeat
+ // is Running, and current log aggregation status is TimeOut,
+ // based on whether there are any failure messages for this NM,
+ // we will reset the log aggregation status as RUNNING or
+ // RUNNING_WITH_FAILURE
+ if (logAggregationFailureMessagesForNMs.get(nodeId) != null
+ && !logAggregationFailureMessagesForNMs.get(nodeId)
+ .isEmpty()) {
report.setLogAggregationStatus(
LogAggregationStatus.RUNNING_WITH_FAILURE);
}
}
- curReport.setLogAggregationStatus(report
- .getLogAggregationStatus());
+ curReport.setLogAggregationStatus(report.getLogAggregationStatus());
}
}
updateLogAggregationDiagnosticMessages(nodeId, report);
@@ -1691,14 +1699,13 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) {
public LogAggregationStatus getLogAggregationStatusForAppReport() {
try {
this.readLock.lock();
- if (! logAggregationEnabled) {
+ if (!logAggregationEnabled) {
return LogAggregationStatus.DISABLED;
}
if (isLogAggregationFinished()) {
return this.logAggregationStatusForAppReport;
}
- Map reports =
- getLogAggregationReportsForApp();
+ Map reports = getLogAggregationReportsForApp();
if (reports.size() == 0) {
return this.logAggregationStatusForAppReport;
}
@@ -1709,31 +1716,32 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() {
int logRunningWithFailure = 0;
for (Entry report : reports.entrySet()) {
switch (report.getValue().getLogAggregationStatus()) {
- case NOT_START:
- logNotStartCount++;
- break;
- case RUNNING_WITH_FAILURE:
- logRunningWithFailure ++;
- break;
- case SUCCEEDED:
- logCompletedCount++;
- break;
- case FAILED:
- logFailedCount++;
- logCompletedCount++;
- break;
- case TIME_OUT:
- logTimeOutCount++;
- logCompletedCount++;
- break;
- default:
- break;
+ case NOT_START:
+ logNotStartCount++;
+ break;
+ case RUNNING_WITH_FAILURE:
+ logRunningWithFailure++;
+ break;
+ case SUCCEEDED:
+ logCompletedCount++;
+ break;
+ case FAILED:
+ logFailedCount++;
+ logCompletedCount++;
+ break;
+ case TIME_OUT:
+ logTimeOutCount++;
+ logCompletedCount++;
+ break;
+ default:
+ break;
}
}
if (logNotStartCount == reports.size()) {
return LogAggregationStatus.NOT_START;
} else if (logCompletedCount == reports.size()) {
- // We should satisfy two condition in order to return SUCCEEDED or FAILED
+ // We should satisfy two condition in order to return SUCCEEDED or
+ // FAILED
// 1) make sure the application is in final state
// 2) logs status from all NMs are SUCCEEDED/FAILED/TIMEOUT
// The SUCCEEDED/FAILED status is the final status which means
@@ -1758,9 +1766,9 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() {
private boolean isLogAggregationFinished() {
return this.logAggregationStatusForAppReport
- .equals(LogAggregationStatus.SUCCEEDED)
+ .equals(LogAggregationStatus.SUCCEEDED)
|| this.logAggregationStatusForAppReport
- .equals(LogAggregationStatus.FAILED);
+ .equals(LogAggregationStatus.FAILED);
}
@@ -1773,31 +1781,28 @@ private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
LogAggregationReport report) {
if (report.getDiagnosticMessage() != null
&& !report.getDiagnosticMessage().isEmpty()) {
- if (report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING ) {
+ if (report.getLogAggregationStatus() == LogAggregationStatus.RUNNING) {
List diagnostics = logAggregationDiagnosticsForNMs.get(nodeId);
if (diagnostics == null) {
diagnostics = new ArrayList();
logAggregationDiagnosticsForNMs.put(nodeId, diagnostics);
} else {
- if (diagnostics.size()
- == maxLogAggregationDiagnosticsInMemory) {
+ if (diagnostics.size() == maxLogAggregationDiagnosticsInMemory) {
diagnostics.remove(0);
}
}
diagnostics.add(report.getDiagnosticMessage());
- this.logAggregationStatus.get(nodeId).setDiagnosticMessage(
- StringUtils.join(diagnostics, "\n"));
- } else if (report.getLogAggregationStatus()
- == LogAggregationStatus.RUNNING_WITH_FAILURE) {
- List failureMessages =
- logAggregationFailureMessagesForNMs.get(nodeId);
+ this.logAggregationStatus.get(nodeId)
+ .setDiagnosticMessage(StringUtils.join(diagnostics, "\n"));
+ } else if (report
+ .getLogAggregationStatus() == LogAggregationStatus.RUNNING_WITH_FAILURE) {
+ List failureMessages = logAggregationFailureMessagesForNMs
+ .get(nodeId);
if (failureMessages == null) {
failureMessages = new ArrayList();
logAggregationFailureMessagesForNMs.put(nodeId, failureMessages);
} else {
- if (failureMessages.size()
- == maxLogAggregationDiagnosticsInMemory) {
+ if (failureMessages.size() == maxLogAggregationDiagnosticsInMemory) {
failureMessages.remove(0);
}
}
@@ -1807,35 +1812,34 @@ private void updateLogAggregationDiagnosticMessages(NodeId nodeId,
}
private void updateLogAggregationStatus(NodeId nodeId) {
- LogAggregationStatus status =
- this.logAggregationStatus.get(nodeId).getLogAggregationStatus();
+ LogAggregationStatus status = this.logAggregationStatus.get(nodeId)
+ .getLogAggregationStatus();
if (status.equals(LogAggregationStatus.SUCCEEDED)) {
this.logAggregationSucceed++;
} else if (status.equals(LogAggregationStatus.FAILED)) {
this.logAggregationFailed++;
}
if (this.logAggregationSucceed == this.logAggregationStatus.size()) {
- this.logAggregationStatusForAppReport =
- LogAggregationStatus.SUCCEEDED;
+ this.logAggregationStatusForAppReport = LogAggregationStatus.SUCCEEDED;
// Since the log aggregation status for this application for all NMs
// is SUCCEEDED, it means all logs are aggregated successfully.
// We could remove all the cached log aggregation reports
this.logAggregationStatus.clear();
this.logAggregationDiagnosticsForNMs.clear();
this.logAggregationFailureMessagesForNMs.clear();
- } else if (this.logAggregationSucceed + this.logAggregationFailed
- == this.logAggregationStatus.size()) {
+ } else if (this.logAggregationSucceed
+ + this.logAggregationFailed == this.logAggregationStatus.size()) {
this.logAggregationStatusForAppReport = LogAggregationStatus.FAILED;
// We have collected the log aggregation status for all NMs.
// The log aggregation status is FAILED which means the log
// aggregation fails in some NMs. We are only interested in the
// nodes where the log aggregation is failed. So we could remove
// the log aggregation details for those succeeded NMs
- for (Iterator> it =
- this.logAggregationStatus.entrySet().iterator(); it.hasNext();) {
+ for (Iterator> it = this.logAggregationStatus
+ .entrySet().iterator(); it.hasNext();) {
Map.Entry entry = it.next();
if (entry.getValue().getLogAggregationStatus()
- .equals(LogAggregationStatus.SUCCEEDED)) {
+ .equals(LogAggregationStatus.SUCCEEDED)) {
it.remove();
}
}
@@ -1848,8 +1852,8 @@ private void updateLogAggregationStatus(NodeId nodeId) {
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
try {
this.readLock.lock();
- List failureMessages =
- this.logAggregationFailureMessagesForNMs.get(nodeId);
+ List failureMessages = this.logAggregationFailureMessagesForNMs
+ .get(nodeId);
if (failureMessages == null || failureMessages.isEmpty()) {
return StringUtils.EMPTY;
}
@@ -1861,8 +1865,8 @@ public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
@Override
public String getAppNodeLabelExpression() {
- String appNodeLabelExpression =
- getApplicationSubmissionContext().getNodeLabelExpression();
+ String appNodeLabelExpression = getApplicationSubmissionContext()
+ .getNodeLabelExpression();
appNodeLabelExpression = (appNodeLabelExpression == null)
? NodeLabel.NODE_LABEL_EXPRESSION_NOT_SET : appNodeLabelExpression;
appNodeLabelExpression = (appNodeLabelExpression.trim().isEmpty())
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java
new file mode 100644
index 0000000..159585e
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppKillByClientEvent.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import java.net.InetAddress;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * An event class that is used to help with logging information
+ * when an application KILL event is needed.
+ *
+ */
+public class RMAppKillByClientEvent extends RMAppEvent {
+
+ private final UserGroupInformation callerUGI;
+ private final InetAddress ip;
+
+ /**
+ * constructor to create an event used for logging during user driven kill
+ * invocations.
+ *
+ * @param appId application id
+ * @param diagnostics message about the kill event
+ * @param callerUGI caller's user & group information
+ * @param remoteIP ip address of the caller
+ */
+ public RMAppKillByClientEvent(ApplicationId appId, String diagnostics,
+ UserGroupInformation callerUGI, InetAddress remoteIP) {
+ super(appId, RMAppEventType.KILL, diagnostics);
+ this.callerUGI = callerUGI;
+ this.ip = remoteIP;
+ }
+
+ /**
+ * returns the {@link UserGroupInformation} information.
+ * @return UserGroupInformation
+ */
+ public final UserGroupInformation getCallerUGI() {
+ return callerUGI;
+ }
+
+ /**
+ * returns the ip address stored in this event.
+ * @return remoteIP
+ */
+ public final InetAddress getIp() {
+ return ip;
+ }
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java
index acb8e37..3311f92 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAuditLogger.java
@@ -23,6 +23,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
@@ -109,18 +110,37 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
null);
}
+ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
+ ApplicationAttemptId attemptId, ContainerId containerId,
+ CallerContext callerContext, Resource resource) {
+ testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId,
+ callerContext, resource, Server.getRemoteIp());
+ }
+
/**
* Test the AuditLog format for successful events.
*/
private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
ApplicationAttemptId attemptId, ContainerId containerId,
- CallerContext callerContext, Resource resource) {
- String sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET,
- appId, attemptId, containerId, resource, callerContext);
+ CallerContext callerContext, Resource resource, InetAddress remoteIp) {
+
+ String sLog;
+ if (checkIP) {
+ sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId,
+ attemptId, containerId, resource, callerContext, remoteIp);
+ } else {
+ sLog = RMAuditLogger.createSuccessLog(USER, OPERATION, TARGET, appId,
+ attemptId, containerId, resource, callerContext, null);
+ }
StringBuilder expLog = new StringBuilder();
expLog.append("USER=test\t");
if (checkIP) {
- InetAddress ip = Server.getRemoteIp();
+ InetAddress ip;
+ if(remoteIp != null) {
+ ip = remoteIp;
+ } else {
+ ip = Server.getRemoteIp();
+ }
expLog.append(Keys.IP.name() + "=" + ip.getHostAddress() + "\t");
}
expLog.append("OPERATION=oper\tTARGET=tgt\tRESULT=SUCCESS");
@@ -148,6 +168,13 @@ private void testSuccessLogFormatHelper(boolean checkIP, ApplicationId appId,
assertEquals(expLog.toString(), sLog);
}
+ private void testSuccessLogFormatHelperWithIP(boolean checkIP,
+ ApplicationId appId, ApplicationAttemptId attemptId,
+ ContainerId containerId, InetAddress ip) {
+ testSuccessLogFormatHelper(checkIP, appId, attemptId, containerId, null,
+ null, ip);
+ }
+
/**
* Test the AuditLog format for successful events passing nulls.
*/
@@ -165,6 +192,33 @@ private void testSuccessLogNulls(boolean checkIP) {
}
/**
+ * Tests the SuccessLog with two IP addresses.
+ *
+ * @param checkIP
+ * @param appId
+ * @param attemptId
+ * @param containerId
+ */
+ private void testSuccessLogFormatHelperWithIP(boolean checkIP,
+ ApplicationId appId, ApplicationAttemptId attemptId,
+ ContainerId containerId) {
+ testSuccessLogFormatHelperWithIP(checkIP, appId, attemptId, containerId,
+ InetAddress.getLoopbackAddress());
+ byte[] ipAddr = new byte[] {100, 10, 10, 1};
+
+ InetAddress addr = null;
+ try {
+ addr = InetAddress.getByAddress(ipAddr);
+ } catch (UnknownHostException uhe) {
+ // should not happen as long as IP address format
+ // stays the same
+ Assert.fail("Check ip address being constructed");
+ }
+ testSuccessLogFormatHelperWithIP(checkIP, appId, attemptId, containerId,
+ addr);
+ }
+
+ /**
* Test the AuditLog format for successful events with the various
* parameters.
*/
@@ -187,6 +241,7 @@ private void testSuccessLogFormat(boolean checkIP) {
testSuccessLogFormatHelper(checkIP, APPID, ATTEMPTID, CONTAINERID,
new CallerContext.Builder(CALLER_CONTEXT).setSignature(CALLER_SIGNATURE)
.build(), RESOURCE);
+ testSuccessLogFormatHelperWithIP(checkIP, APPID, ATTEMPTID, CONTAINERID);
testSuccessLogNulls(checkIP);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
index 9131643..dcc5631 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java
@@ -37,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -257,7 +258,7 @@ protected RMApp createNewTestApp(ApplicationSubmissionContext submissionContext)
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
-
+
if(submissionContext == null) {
submissionContext = new ApplicationSubmissionContextPBImpl();
}
@@ -305,7 +306,7 @@ private static void testAppStartState(ApplicationId applicationId,
private static void assertStartTimeSet(RMApp application) {
Assert.assertTrue("application start time is not greater than 0",
application.getStartTime() > 0);
- Assert.assertTrue("application start time is before currentTime",
+ Assert.assertTrue("application start time is before currentTime",
application.getStartTime() <= System.currentTimeMillis());
}
@@ -324,7 +325,7 @@ private void assertTimesAtFinish(RMApp application) {
assertStartTimeSet(application);
Assert.assertTrue("application finish time is not greater than 0",
(application.getFinishTime() > 0));
- Assert.assertTrue("application finish time is not >= than start time",
+ Assert.assertTrue("application finish time is not >= start time",
(application.getFinishTime() >= application.getStartTime()));
}
@@ -549,11 +550,14 @@ public void testAppRecoverPath() throws IOException {
public void testAppNewKill() throws IOException {
LOG.info("--- START: testAppNewKill ---");
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppNewKill", new String[] {"foo_group"});
+
RMApp application = createNewTestApp(null);
// NEW => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -604,9 +608,13 @@ public void testAppNewSavingKill() throws IOException {
RMApp application = testCreateAppNewSaving(null);
// NEW_SAVING => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppNewSavingKill", new String[] {"foo_group"});
+
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
+
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -653,10 +661,15 @@ public void testAppSubmittedRejected() throws IOException {
public void testAppSubmittedKill() throws IOException, InterruptedException {
LOG.info("--- START: testAppSubmittedKill---");
RMApp application = testCreateAppSubmittedNoRecovery(null);
+
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppSubmittedKill", new String[] {"foo_group"});
+
// SUBMITTED => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
+
application.handle(event);
rmDispatcher.await();
sendAppUpdateSavedEvent(application);
@@ -706,9 +719,13 @@ public void testAppAcceptedKill() throws IOException, InterruptedException {
LOG.info("--- START: testAppAcceptedKill ---");
RMApp application = testCreateAppAccepted(null);
// ACCEPTED => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppAcceptedKill", new String[] {"foo_group"});
+
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
+
application.handle(event);
rmDispatcher.await();
@@ -754,9 +771,14 @@ public void testAppRunningKill() throws IOException {
RMApp application = testCreateAppRunning(null);
// RUNNING => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppRunningKill", new String[] {"foo_group"});
+
+ // SUBMITTED => KILLED event RMAppEventType.KILL
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
+
application.handle(event);
rmDispatcher.await();
@@ -920,9 +942,14 @@ public void testAppKilledKilled() throws IOException {
RMApp application = testCreateAppRunning(null);
// RUNNING => KILLED event RMAppEventType.KILL
- RMAppEvent event =
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL,
- "Application killed by user.");
+ UserGroupInformation fooUser = UserGroupInformation.createUserForTesting(
+ "fooTestAppRunningKill", new String[] {"foo_group"});
+
+ // SUBMITTED => KILLED event RMAppEventType.KILL
+ RMAppEvent event = new RMAppKillByClientEvent(
+ application.getApplicationId(), "Application killed by user.", fooUser,
+ Server.getRemoteIp());
+
application.handle(event);
rmDispatcher.await();
sendAttemptUpdateSavedEvent(application);