diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java index 61de032..d2578ed 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java @@ -38,6 +38,7 @@ //Producer:ContainerLauncher TA_CONTAINER_LAUNCHED, TA_CONTAINER_LAUNCH_FAILED, + TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN, TA_CONTAINER_CLEANED, //Producer:TaskAttemptListener diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3055a25..129b39ed 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -212,6 +212,7 @@ // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN, TaskAttemptEventType.TA_CONTAINER_CLEANED, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, @@ -271,11 +272,14 @@ .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false)) + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.KILLED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN, + new DeallocateContainerTransition(TaskAttemptStateInternal.KILLED, false)) .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_COMPLETED, FINALIZE_FAILED_TRANSITION) - .addTransition(TaskAttemptStateInternal.ASSIGNED, + .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) .addTransition(TaskAttemptStateInternal.ASSIGNED, @@ -494,6 +498,7 @@ // Container launch events can arrive late TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, @@ -517,6 +522,7 @@ TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_CONTAINER_LAUNCHED, TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILMSG, TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index a7e966c..76783b2 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; +import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -132,7 +133,7 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) { if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) { state = ContainerState.DONE; sendContainerLaunchFailedMsg(taskAttemptID, - "Container was killed before it was launched"); + "Container was killed before it was launched", false); return; } @@ -179,11 +180,16 @@ public synchronized void launch(ContainerRemoteLaunchEvent event) { context.getEventHandler().handle( new TaskAttemptContainerLaunchedEvent(taskAttemptID, port)); this.state = ContainerState.RUNNING; + } catch (NMNotYetReadyException e) { + String message = "Container launch failed for " + containerID + " : " + + StringUtils.stringifyException(e); + this.state = ContainerState.FAILED; + sendContainerLaunchFailedMsg(taskAttemptID, message, true); } catch (Throwable t) { String message = "Container launch failed for " + containerID + " : " + StringUtils.stringifyException(t); this.state = ContainerState.FAILED; - sendContainerLaunchFailedMsg(taskAttemptID, message); + sendContainerLaunchFailedMsg(taskAttemptID, message, false); } finally { if (proxy != null) { cmProxy.mayBeCloseProxy(proxy); @@ -394,13 +400,15 @@ public void run() { @SuppressWarnings("unchecked") void sendContainerLaunchFailedMsg(TaskAttemptId taskAttemptID, - String message) { + String message, boolean dueToYarn) { LOG.error(message); context.getEventHandler().handle( new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message)); + TaskAttemptEventType type = dueToYarn ? + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED_DUE_TO_YARN : + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED; context.getEventHandler().handle( - new TaskAttemptEvent(taskAttemptID, - TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + new TaskAttemptEvent(taskAttemptID, type)); } @Override diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java index 41ee65d..aeb5301 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java @@ -25,11 +25,16 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.mapred.ShuffleHandler; +import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -263,7 +268,7 @@ public void testSlowNM() throws Exception { "token"); server = rpc.getServer(ContainerManagementProtocol.class, - new DummyContainerManager(), addr, conf, tokenSecretManager, 1); + new SlowContainerManager(), addr, conf, tokenSecretManager, 1); server.start(); MRApp app = new MRAppWithSlowNM(tokenSecretManager); @@ -304,6 +309,54 @@ public void testSlowNM() throws Exception { } } + @Test(timeout = 15000) + public void testUnregisteredNM() throws Exception { + + conf = new Configuration(); + int maxAttempts = 2; + conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class.getName()); + YarnRPC rpc = YarnRPC.create(conf); + String bindAddr = "localhost:0"; + InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr); + NMTokenSecretManagerInNM tokenSecretManager = + new NMTokenSecretManagerInNM(); + MasterKey masterKey = Records.newRecord(MasterKey.class); + masterKey.setBytes(ByteBuffer.wrap("key".getBytes())); + tokenSecretManager.setMasterKey(masterKey); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, + "token"); + UnregisteredContainerManager conMan = new UnregisteredContainerManager(); + server = rpc.getServer(ContainerManagementProtocol.class, conMan, addr, + conf, tokenSecretManager, 1); + server.start(); + + MRApp app = new MRAppWithSlowNM(tokenSecretManager); + + try { + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Map tasks = job.getTasks(); + Assert.assertEquals("Num tasks is not correct", 1, tasks.size()); + + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + int numTaskAttempts; + do { + numTaskAttempts = task.getAttempts().size(); + } while (numTaskAttempts <= maxAttempts); + app.waitForState(task, TaskState.SCHEDULED); + + conMan.throwNMNotYetReadyException.set(false); + app.waitForState(task, TaskState.RUNNING); + } finally { + server.stop(); + app.stop(); + } + } + private final class CustomContainerLauncher extends ContainerLauncherImpl { private volatile int expectedCorePoolSize = 0; @@ -402,7 +455,7 @@ public ContainerManagementProtocolProxyData getCMProxy( }; } - public class DummyContainerManager implements ContainerManagementProtocol { + public class SlowContainerManager implements ContainerManagementProtocol { private ContainerStatus status = null; @@ -450,4 +503,56 @@ public StopContainersResponse stopContainers(StopContainersRequest request) throw new IOException(e); } } + + public class UnregisteredContainerManager implements ContainerManagementProtocol { + + public AtomicBoolean throwNMNotYetReadyException = new AtomicBoolean(true); + + private ContainerStatus status = null; + + @Override + public GetContainerStatusesResponse getContainerStatuses( + GetContainerStatusesRequest request) throws IOException { + List statuses = new ArrayList(); + statuses.add(status); + return GetContainerStatusesResponse.newInstance(statuses, null); + } + + @Override + public StartContainersResponse startContainers(StartContainersRequest requests) + throws YarnException, IOException { + if (throwNMNotYetReadyException.get()) { + throw new NMNotYetReadyException("msg"); + } + + StartContainerRequest request = requests.getStartContainerRequests().get(0); + ContainerTokenIdentifier containerTokenIdentifier = + MRApp.newContainerTokenIdentifier(request.getContainerToken()); + + // Validate that the container is what RM is giving. + Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_PORT, + containerTokenIdentifier.getNmHostAddress()); + + StartContainersResponse response = recordFactory + .newRecordInstance(StartContainersResponse.class); + status = recordFactory.newRecordInstance(ContainerStatus.class); + status.setState(ContainerState.RUNNING); + status.setContainerId(containerTokenIdentifier.getContainerID()); + status.setExitStatus(0); + Map serviceResponse = + new HashMap(); + serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID, + ShuffleHandler.serializeMetaData(80)); + response.setAllServicesMetaData(serviceResponse); + return response; + } + + @Override + public StopContainersResponse stopContainers(StopContainersRequest request) + throws IOException { + Exception e = new Exception("Dummy function", new Exception( + "Dummy function cause")); + throw new IOException(e); + } + } }