diff --git a/.gitignore b/.gitignore index 15c040c..a49ad4b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +.DS_Store *.iml *.ipr *.iws diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java index c854173..a638d1d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerContainerExecutor.java @@ -22,10 +22,11 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; - +import com.google.common.eventbus.EventBus; import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; @@ -51,16 +52,17 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; -import java.net.InetSocketAddress; + import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; @@ -68,29 +70,32 @@ * This executor will launch a docker container and run the task inside the container. */ public class DockerContainerExecutor extends ContainerExecutor { - + private static final String LINE_SEPARATOR = + System.getProperty("line.separator"); private static final Log LOG = LogFactory .getLog(DockerContainerExecutor.class); public static final String DOCKER_CONTAINER_EXECUTOR_SCRIPT = "docker_container_executor"; - public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT = "docker_container_executor_session"; - + public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT_CREATE = "docker_container_executor_session_create"; + public static final String DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT_START = "docker_container_executor_session_start"; // This validates that the image is a proper docker image and would not crash docker. public static final String DOCKER_IMAGE_PATTERN = "^(([\\w\\.-]+)(:\\d+)*\\/)?[\\w\\.:-]+$"; private final FileContext lfs; private final Pattern dockerImagePattern; +private EventBus eventBus; - public DockerContainerExecutor() { +public DockerContainerExecutor() { try { this.lfs = FileContext.getLocalFSFileContext(); this.dockerImagePattern = Pattern.compile(DOCKER_IMAGE_PATTERN); + this.eventBus = new EventBus(); } catch (UnsupportedFileSystemException e) { throw new RuntimeException(e); } } - protected void copyFile(Path src, Path dst, String owner) throws IOException { + private void copyFile(Path src, Path dst, String owner) throws IOException { lfs.util().copy(src, dst); } @@ -102,10 +107,18 @@ public void init() throws IOException { } String dockerExecutor = getConf().get(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, YarnConfiguration.NM_DEFAULT_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME); - if (!new File(dockerExecutor).exists()) { + String[] arr = dockerExecutor.split("\\s"); + if (LOG.isDebugEnabled()) { + LOG.debug("dockerExecutor: " + dockerExecutor); + } + if (!new File(arr[0]).exists()) { throw new IllegalStateException("Invalid docker exec path: " + dockerExecutor); } - } + + DockerEventBroadcastService dockerEventBroadcastService = new DockerEventBroadcastService(dockerExecutor, eventBus); + dockerEventBroadcastService.init(getConf()); + dockerEventBroadcastService.start(); + } @Override public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, @@ -140,7 +153,7 @@ public synchronized void startLocalizer(Path nmPrivateContainerTokensPath, @Override - public int launchContainer(Container container, + public int launchContainer(final Container container, Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath, String userName, String appId, Path containerWorkDir, List localDirs, List logDirs) throws IOException { @@ -197,60 +210,81 @@ public int launchContainer(Container container, String logDirMount = toMount(logDirs); String containerWorkDirMount = toMount(Collections.singletonList(containerWorkDir.toUri().getPath())); StringBuilder commands = new StringBuilder(); + Path pidFile = getPidFilePath(containerId); String commandStr = commands.append(dockerExecutor) .append(" ") - .append("run") + .append("create") .append(" ") - .append("--rm --net=host") + .append("--net=host") .append(" ") - .append(" --name " + containerIdStr) + .append("--name " + containerIdStr) .append(localDirMount) .append(logDirMount) .append(containerWorkDirMount) .append(" ") .append(containerImageName) + .append(" ") + .append("bash ") + .append(launchDst.toUri().getPath().toString()) .toString(); - String dockerPidScript = "`" + dockerExecutor + " inspect --format {{.State.Pid}} " + containerIdStr + "`"; + + String dockerRmScript = dockerExecutor + " rm " + containerIdStr; + // Create new local launch wrapper script - LocalWrapperScriptBuilder sb = - new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, dockerPidScript); - Path pidFile = getPidFilePath(containerId); - if (pidFile != null) { - sb.writeLocalWrapperScript(launchDst, pidFile); - } else { - LOG.info("Container " + containerIdStr - + " was marked as inactive. Returning terminated error"); - return ExitCode.TERMINATED.getExitCode(); - } - ShellCommandExecutor shExec = null; + ExecutorService executorService = null; try { - lfs.setPermission(launchDst, - ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - lfs.setPermission(sb.getWrapperScriptPath(), - ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - - // Setup command to run - String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), - containerIdStr, userName, pidFile, this.getConf()); - if (LOG.isDebugEnabled()) { - LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(" ").join(command)); + LocalScriptExecutor localScriptExecutor = new LocalScriptExecutorBuilder() + .setContainer(container) + .setUserName(userName) + .setContainerWorkDir(containerWorkDir) + .setContainerId(containerId) + .setContainerIdStr(containerIdStr) + .setLaunchDst(launchDst) + .setPidFile(pidFile) + .setCommandStr(commandStr) + .setConf(getConf()) + .setSessionScript(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT_CREATE) + .createLocalScriptExecutor() + .invoke(); + if (localScriptExecutor.is()) { + return ExitCode.TERMINATED.getExitCode(); } - shExec = new ShellCommandExecutor( - command, - new File(containerWorkDir.toUri().getPath()), - container.getLaunchContext().getEnvironment()); // sanitized env - if (isContainerActive(containerId)) { - shExec.execute(); - } else { - LOG.info("Container " + containerIdStr + - " was marked as inactive. Returning terminated error"); + + shExec = localScriptExecutor.getShExec(); + String cid = shExec.getOutput(); + if (cid.length() > 1) { + cid = cid.substring(0, cid.length() - 1); + } + DockerEventSubscriber subscriber = new DockerEventSubscriber(dockerExecutor, pidFile, cid); + eventBus.register(subscriber); + //now run the job + commandStr = dockerExecutor + " start -a " + containerIdStr; + localScriptExecutor = new LocalScriptExecutorBuilder() + .setContainer(container) + .setUserName(userName) + .setContainerWorkDir(containerWorkDir) + .setContainerId(containerId) + .setContainerIdStr(containerIdStr) + .setLaunchDst(launchDst) + .setPidFile(pidFile) + .setCommandStr(commandStr) + .setConf(getConf()) + .setSessionScript(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT_START) + .setDockerRmCmd(dockerRmScript) + .createLocalScriptExecutor() + .invoke(); + if (localScriptExecutor.is()) { return ExitCode.TERMINATED.getExitCode(); } + shExec = localScriptExecutor.getShExec(); + } catch (IOException e) { + LOG.debug("exception, ", e); if (null == shExec) { return -1; } + int exitCode = shExec.getExitCode(); LOG.warn("Exit code from container " + containerId + " is : " + exitCode); // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the container was @@ -274,11 +308,15 @@ public int launchContainer(Container container, if (shExec != null) { shExec.close(); } + if (executorService != null) { + executorService.shutdown(); + } } return 0; } - @Override + +@Override public void writeLaunchEnv(OutputStream out, Map environment, Map> resources, List command) throws IOException { ContainerLaunch.ShellScriptBuilder sb = ContainerLaunch.ShellScriptBuilder.create(); @@ -327,7 +365,7 @@ public void writeLaunchEnv(OutputStream out, Map environment, Ma } } if (LOG.isDebugEnabled()) { - LOG.debug("Script: " + baos.toString("UTF-8")); + LOG.debug("Launch Script: " + baos.toString("UTF-8")); } } @@ -389,7 +427,7 @@ public static boolean containerIsAlive(String pid) throws IOException { * @param signal signal to send * (for logging). */ - protected void killContainer(String pid, Signal signal) throws IOException { + private void killContainer(String pid, Signal signal) throws IOException { new ShellCommandExecutor(Shell.getSignalKillCommand(signal.getValue(), pid)) .execute(); } @@ -461,25 +499,53 @@ protected LocalWrapperScriptBuilder(Path containerWorkDir) { extends LocalWrapperScriptBuilder { private final Path sessionScriptPath; private final String dockerCommand; - private final String dockerPidScript; - - public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String dockerPidScript) { + private final String dockerRmCmd; + public UnixLocalWrapperScriptBuilder(Path containerWorkDir, String dockerCommand, String sessionScript, String dockerRmCmd) { super(containerWorkDir); this.dockerCommand = dockerCommand; - this.dockerPidScript = dockerPidScript; this.sessionScriptPath = new Path(containerWorkDir, - Shell.appendScriptExtension(DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); + Shell.appendScriptExtension(sessionScript)); + this.dockerRmCmd = dockerRmCmd; } @Override public void writeLocalWrapperScript(Path launchDst, Path pidFile) throws IOException { - writeSessionScript(launchDst, pidFile); + DataOutputStream out = null; + PrintStream pout = null; + PrintStream ps = null; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + if (LOG.isDebugEnabled()) { + ps = new PrintStream(baos, false, "UTF-8"); + } + try { + out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); + pout = new PrintStream(out, false, "UTF-8"); + pout.println("#!/usr/bin/env bash"); + pout.println(); + pout.println(dockerCommand); + pout.println(); + if (!Strings.isNullOrEmpty(dockerRmCmd)) { + pout.println(dockerRmCmd); + pout.println(); + } + if (LOG.isDebugEnabled()) { + ps.println("#!/usr/bin/env bash"); + ps.println(); + ps.println(dockerCommand); + ps.println(); + LOG.debug("Session Script: " + baos.toString("UTF-8")); + } + } finally { + IOUtils.cleanup(LOG, pout, out); + } + lfs.setPermission(sessionScriptPath, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); super.writeLocalWrapperScript(launchDst, pidFile); } @Override - public void writeLocalWrapperScript(Path launchDst, Path pidFile, + protected void writeLocalWrapperScript(Path launchDst, Path pidFile, PrintStream pout) { String exitCodeFile = ContainerLaunch.getExitCodeFile( @@ -493,31 +559,9 @@ public void writeLocalWrapperScript(Path launchDst, Path pidFile, pout.println("exit $rc"); } - private void writeSessionScript(Path launchDst, Path pidFile) - throws IOException { - DataOutputStream out = null; - PrintStream pout = null; - try { - out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE)); - pout = new PrintStream(out, false, "UTF-8"); - // We need to do a move as writing to a file is not atomic - // Process reading a file being written to may get garbled data - // hence write pid to tmp file first followed by a mv - pout.println("#!/usr/bin/env bash"); - pout.println(); - pout.println("echo "+ dockerPidScript +" > " + pidFile.toString() + ".tmp"); - pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile); - pout.println(dockerCommand + " bash \"" + - launchDst.toUri().getPath().toString() + "\""); - } finally { - IOUtils.cleanup(LOG, pout, out); - } - lfs.setPermission(sessionScriptPath, - ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); - } } - protected void createDir(Path dirPath, FsPermission perms, + private void createDir(Path dirPath, FsPermission perms, boolean createParent, String user) throws IOException { lfs.mkdir(dirPath, perms, createParent); if (!perms.equals(perms.applyUMask(lfs.getUMask()))) { @@ -706,7 +750,7 @@ private Path getFileCacheDir(Path base, String user) { ContainerLocalizer.FILECACHE); } - protected Path getWorkingDir(List localDirs, String user, + private Path getWorkingDir(List localDirs, String user, String appId) throws IOException { Path appStorageDir = null; long totalAvailable = 0L; @@ -791,4 +835,152 @@ void createAppLogDirs(String appId, List logDirs, String user) return paths; } +private class LocalScriptExecutor { + private boolean inActive; + private Container container; + private String userName; + private Path containerWorkDir; + private ContainerId containerId; + private String containerIdStr; + private Path launchDst; + private Path pidFile; + private String commandStr; + private ShellCommandExecutor shExec; + private String sessionScript; + private Configuration conf; + private String dockerRmCmd; + + public LocalScriptExecutor(Container container, String userName, Path containerWorkDir, ContainerId containerId, String containerIdStr, Path launchDst, Path pidFile, String commandStr, String sessionScript, Configuration conf, String dockerRmCmd) { + this.container = container; + this.userName = userName; + this.containerWorkDir = containerWorkDir; + this.containerId = containerId; + this.containerIdStr = containerIdStr; + this.launchDst = launchDst; + this.pidFile = pidFile; + this.commandStr = commandStr; + this.sessionScript = sessionScript; + this.conf = conf; + this.dockerRmCmd = dockerRmCmd; + } + + boolean is() { + return inActive; + } + + public ShellCommandExecutor getShExec() { + return shExec; + } + + public LocalScriptExecutor invoke() throws IOException { + LocalWrapperScriptBuilder sb = + new UnixLocalWrapperScriptBuilder(containerWorkDir, commandStr, sessionScript, dockerRmCmd); + + sb.writeLocalWrapperScript(launchDst, pidFile); + + shExec = null; + + + lfs.setPermission(launchDst, + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + lfs.setPermission(sb.getWrapperScriptPath(), + ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION); + + // Setup command to run + String[] command = getRunCommand(sb.getWrapperScriptPath().toString(), + containerIdStr, userName, pidFile, this.conf); + if (LOG.isDebugEnabled()) { + LOG.debug("launchContainer: " + commandStr + " " + Joiner.on(", ").join(command)); + } + shExec = new ShellCommandExecutor( + command, + new File(containerWorkDir.toUri().getPath()), + container.getLaunchContext().getEnvironment()); // sanitized env + + if (isContainerActive(containerId)) { + shExec.execute(); + } else { + LOG.info("Container " + containerIdStr + + " was marked as inactive. Returning terminated error"); + inActive = true; + return this; + } + inActive = false; + return this; + } +} + +private class LocalScriptExecutorBuilder { +private Container container; +private String userName; +private Path containerWorkDir; +private ContainerId containerId; +private String containerIdStr; +private Path launchDst; +private Path pidFile; +private String commandStr; +private Configuration conf; +private String sessionScript; +private String dockerRmCmd; + +public LocalScriptExecutorBuilder setContainer(Container container) { + this.container = container; + return this; +} + +public LocalScriptExecutorBuilder setUserName(String userName) { + this.userName = userName; + return this; +} + +public LocalScriptExecutorBuilder setContainerWorkDir(Path containerWorkDir) { + this.containerWorkDir = containerWorkDir; + return this; +} + +public LocalScriptExecutorBuilder setContainerId(ContainerId containerId) { + this.containerId = containerId; + return this; +} + +public LocalScriptExecutorBuilder setContainerIdStr(String containerIdStr) { + this.containerIdStr = containerIdStr; + return this; +} + +public LocalScriptExecutorBuilder setLaunchDst(Path launchDst) { + this.launchDst = launchDst; + return this; +} + +public LocalScriptExecutorBuilder setPidFile(Path pidFile) { + this.pidFile = pidFile; + return this; +} + +public LocalScriptExecutorBuilder setCommandStr(String commandStr) { + this.commandStr = commandStr; + return this; +} + + +public LocalScriptExecutorBuilder setSessionScript(String rmCmd) { + this.sessionScript = rmCmd; + return this; +} + +public LocalScriptExecutorBuilder setDockerRmCmd(String rmCmd) { + this.dockerRmCmd = rmCmd; + return this; +} + +public LocalScriptExecutorBuilder setConf(Configuration conf) { + this.conf = conf; + return this; +} + +public LocalScriptExecutor createLocalScriptExecutor() { + return new LocalScriptExecutor(container, userName, containerWorkDir, containerId, containerIdStr, launchDst, pidFile, commandStr, sessionScript, conf, dockerRmCmd); +} +} } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEvent.java new file mode 100644 index 0000000..6850e22 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +import java.io.Serializable; + +abstract class DockerEvent extends AbstractEvent implements Serializable{ +final String dockerContainerId; +public DockerEvent(String dockerContainerId, DockerEventType dockerEventType) { + super(dockerEventType); + this.dockerContainerId = dockerContainerId; +} +static class DockerContainerStartedEvent extends DockerEvent { + public DockerContainerStartedEvent(String dockerContainerId) { + super(dockerContainerId, DockerEventType.STARTED); + } +} +static class DockerContainerCreatedEvent extends DockerEvent { + public DockerContainerCreatedEvent(String dockerContainerId) { + super(dockerContainerId, DockerEventType.CREATED); + } +} +static class DockerContainerKilledEvent extends DockerEvent { + public DockerContainerKilledEvent(String dockerContainerId) { + super(dockerContainerId, DockerEventType.KILLED); + } +} +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventBroadcastService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventBroadcastService.java new file mode 100644 index 0000000..edb451c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventBroadcastService.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.eventbus.EventBus; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.AbstractService; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + + +class DockerEventBroadcastService extends AbstractService { +private static final Log LOG = LogFactory + .getLog(DockerEventBroadcastService.class); + private String dockerExecutor; + private final ExecutorService executorService; + private final EventBus eventBus; + + public DockerEventBroadcastService(String dockerExecutor, EventBus eventBus) { + super(DockerEventBroadcastService.class.getName()); + this.dockerExecutor = dockerExecutor; + this.eventBus = eventBus; + this.executorService = Executors.newSingleThreadExecutor(); + } + +@Override +protected void serviceStart() throws Exception { + final String dockerEventsCmd = dockerExecutor + " events"; + + executorService.submit(new Callable() { + @Override + public Void call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("started call " + dockerEventsCmd); + } + ProcessBuilder pb = new ProcessBuilder(dockerEventsCmd.split(" ")); + pb.redirectErrorStream(true); + + final Process process = pb.start(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line = br.readLine(); + while (line != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("line " + line); + } + if (line != null) { + String[] words = line.split(" "); + String cid = words[1].substring(0, words[1].length() - 1); + switch (words[4]) { + case "create": + eventBus.post(new DockerEvent.DockerContainerCreatedEvent(cid)); + case "start": + LOG.debug("Launched an event " + cid); + eventBus.post(new DockerEvent.DockerContainerStartedEvent(cid)); + break; + case "die": + eventBus.post(new DockerEvent.DockerContainerKilledEvent(cid)); + break; + default: + LOG.warn("unknown event: " + words[4]); + } + } + line = br.readLine(); + } + + } + return null; + + } + }); + +} + +@Override +protected void serviceStop() throws Exception { + if(executorService != null) { + executorService.shutdown(); + } +} + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventSubscriber.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventSubscriber.java new file mode 100644 index 0000000..e23402d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventSubscriber.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import com.google.common.eventbus.Subscribe; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; + +import java.io.BufferedReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +class DockerEventSubscriber { + private static final Log LOG = LogFactory + .getLog(DockerEventBroadcastService.class); + private final String dockerExecutor; + private final Path pidFile; + private final String cid; + + public DockerEventSubscriber(String dockerExecutor, Path pidFile, String cid) { + this.dockerExecutor = dockerExecutor; + this.pidFile = pidFile; + this.cid = cid; + } + + @Subscribe + public void handleStartEvent(DockerEvent.DockerContainerStartedEvent event) throws IOException, InterruptedException { + if (!event.dockerContainerId.equals(cid)) { + LOG.debug("Not equal: " + event.dockerContainerId.equals(cid)); + return; + } + final String dockerPidScript = dockerExecutor + " inspect --format {{.State.Pid}} " + cid; + LOG.debug("pid script: " + dockerPidScript); + ProcessBuilder processBuilder = new ProcessBuilder(dockerPidScript.split(" ")); + + Process process = processBuilder.start(); + final BufferedReader errReader = + new BufferedReader(new InputStreamReader( + process.getErrorStream(), Charset.defaultCharset())); + final StringBuffer errMsg = new StringBuffer(); + ExecutorService service = Executors.newSingleThreadExecutor(); + Future f = service.submit(new Callable() { + @Override + public Void call() throws Exception { + + String line = errReader.readLine(); + while (line != null) { + errMsg.append(line); + errMsg.append(System.getProperty("line.separator")); + line = errReader.readLine(); + } + return null; + } + }); + + try { + f.get(); + } catch (ExecutionException e) { + LOG.error("Error getting error out: ", e); + } finally { + if (errReader != null) { + errReader.close(); + } + } + try(BufferedReader inReader = + new BufferedReader(new InputStreamReader( + process.getInputStream(), Charset.defaultCharset()))) { + String line = inReader.readLine(); + + int exitCode = process.waitFor(); + + if (exitCode != 0) { + throw new Shell.ExitCodeException(exitCode, "Error: " + dockerPidScript + " error msg: " + errMsg); + } + + try (FileWriter fw = new FileWriter(pidFile.toString())) { + fw.write(line); + if (LOG.isDebugEnabled()) { + LOG.debug("wrote pid: " + line + " file: " + pidFile); + } + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventType.java new file mode 100644 index 0000000..630cea9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DockerEventType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +enum DockerEventType { + + // Producer: ContainerManager + CREATED, + STARTED, + KILLED +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java index e43ac2e..16e01ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutor.java @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,14 +41,13 @@ import java.io.IOException; import java.io.LineNumberReader; import java.io.PrintWriter; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assume.assumeTrue; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -61,7 +59,7 @@ *
  • Install docker, and Compile the code with docker-service-url set to the host and port * where docker service is running. *
    
    - * > mvn clean install -Ddocker-service-url=tcp://0.0.0.0:4243
    + * > mvn clean install -Ddocker-exec=/usr/bin/docker -H tcp://0.0.0.0:4243
      *                          -DskipTests
      * 
    */ @@ -77,7 +75,6 @@ private int id = 0; private String appSubmitter; - private String dockerUrl; private String testImage = "centos"; private String dockerExec; private String containerIdStr; @@ -106,13 +103,11 @@ public void setup() { conf.set(YarnConfiguration.NM_LOCAL_DIRS, "/tmp/nm-local-dir" + time); conf.set(YarnConfiguration.NM_LOG_DIRS, "/tmp/userlogs" + time); - dockerUrl = System.getProperty("docker-service-url"); - LOG.info("dockerUrl: " + dockerUrl); - if (Strings.isNullOrEmpty(dockerUrl)) { + dockerExec = System.getProperty("docker-exec"); + LOG.info("dockerExec: " + dockerExec); + if (Strings.isNullOrEmpty(dockerExec)) { return; } - dockerUrl = " -H " + dockerUrl; - dockerExec = "docker " + dockerUrl; conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME, yarnImage); conf.set(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_EXEC_NAME, dockerExec); exec = new DockerContainerExecutor(); @@ -146,6 +141,7 @@ private boolean shouldRun() { } private int runAndBlock(ContainerId cId, Map launchCtxEnv, String... cmd) throws IOException { + exec.init(); String appId = "APP_" + System.currentTimeMillis(); Container container = mock(Container.class); ContainerLaunchContext context = mock(ContainerLaunchContext.class); @@ -172,8 +168,17 @@ private String writeScriptFile(Map launchCtxEnv, String... cmd) File f = File.createTempFile("TestDockerContainerExecutor", ".sh"); f.deleteOnExit(); PrintWriter p = new PrintWriter(new FileOutputStream(f)); + Set exclusionSet = new HashSet(); + exclusionSet.add(YarnConfiguration.NM_DOCKER_CONTAINER_EXECUTOR_IMAGE_NAME); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_YARN_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name()); + exclusionSet.add(ApplicationConstants.Environment.HADOOP_CONF_DIR.name()); + exclusionSet.add(ApplicationConstants.Environment.JAVA_HOME.name()); for(Map.Entry entry: launchCtxEnv.entrySet()) { - p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\""); + if (!exclusionSet.contains(entry.getKey())) { + p.println("export " + entry.getKey() + "=\"" + entry.getValue() + "\""); + } } for (String part : cmd) { p.print(part.replace("\\", "\\\\").replace("'", "\\'")); @@ -207,7 +212,13 @@ public void testLaunchContainer() throws IOException { ContainerId cId = getNextContainerId(); int ret = runAndBlock( cId, env, "touch", touchFile.getAbsolutePath(), "&&", "cp", touchFile.getAbsolutePath(), "/"); - + Path workDir = new Path(workSpace.getAbsolutePath()); + Path pidFile = new Path(workDir, "pid.txt"); + LineNumberReader lnr = new LineNumberReader(new FileReader(pidFile.toString())); assertEquals(0, ret); + while(lnr.ready()) { + String line = lnr.readLine(); + assertTrue("Not a valid pid in " + pidFile + " pid: " + line,line.matches("[1-9]\\d*")); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java index 3584fed..3c9003f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDockerContainerExecutorWithMocks.java @@ -18,7 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; -import com.google.common.base.Strings; +import com.google.common.base.Joiner; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,9 +49,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; /** * Mock tests for docker container executor @@ -212,42 +212,30 @@ public void testContainerLaunch() throws IOException { //get the script Path sessionScriptPath = new Path(workDir, Shell.appendScriptExtension( - DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT)); + DockerContainerExecutor.DOCKER_CONTAINER_EXECUTOR_SESSION_SCRIPT_CREATE)); LineNumberReader lnr = new LineNumberReader(new FileReader(sessionScriptPath.toString())); boolean cmdFound = false; List localDirs = dirsToMount(dirsHandler.getLocalDirs()); List logDirs = dirsToMount(dirsHandler.getLogDirs()); List workDirMount = dirsToMount(Collections.singletonList(workDir.toUri().getPath())); List expectedCommands = new ArrayList( - Arrays.asList(DOCKER_LAUNCH_COMMAND, "run", "--rm", "--net=host", "--name", containerId)); + Arrays.asList(DOCKER_LAUNCH_COMMAND, "create", "--net=host", "--name", containerId)); expectedCommands.addAll(localDirs); expectedCommands.addAll(logDirs); expectedCommands.addAll(workDirMount); - String shellScript = workDir + "/launch_container.sh"; + String shellScript = "bash " + workDir + "/launch_container.sh"; - expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), "bash","\"" + shellScript + "\"")); - - String expectedPidString = "echo `/bin/true inspect --format {{.State.Pid}} " + containerId+"` > "+ pidFile.toString() + ".tmp"; - boolean pidSetterFound = false; + expectedCommands.addAll(Arrays.asList(testImage.replaceAll("['\"]", ""), shellScript)); while(lnr.ready()){ String line = lnr.readLine(); LOG.debug("line: " + line); - if (line.startsWith(DOCKER_LAUNCH_COMMAND)){ - List command = new ArrayList(); - for( String s :line.split("\\s+")){ - command.add(s.trim()); - } - - assertEquals(expectedCommands, command); + if (line.startsWith(DOCKER_LAUNCH_COMMAND + " create ")){ + assertEquals(Joiner.on(" ").join(expectedCommands), line); cmdFound = true; - } else if (line.startsWith("echo")) { - assertEquals(expectedPidString, line); - pidSetterFound = true; } } assertTrue(cmdFound); - assertTrue(pidSetterFound); } private List dirsToMount(List dirs) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/log4j.properties index 531b68b..63353bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/log4j.properties +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/log4j.properties @@ -12,7 +12,7 @@ # log4j configuration used during build and unit tests -log4j.rootLogger=info,stdout +log4j.rootLogger=debug,stdout log4j.threshhold=ALL log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout