diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml index 70fca8a89c8..bd81e7d5bc6 100644 --- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml +++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml @@ -666,6 +666,10 @@ junit:junit com.google.code.findbugs:jsr305 log4j:log4j + org.eclipse.jetty.websocket:* + javax.websocket:javax.websocket-api + javax.annotation:javax.annotation-api + org.eclipse.jetty:jetty-jndi @@ -777,6 +781,12 @@ ehcache-core.xsd + + org.eclipse.jetty.websocket:javax-websocket-server-impl + + * + + diff --git a/hadoop-client-modules/hadoop-client-runtime/pom.xml b/hadoop-client-modules/hadoop-client-runtime/pom.xml index bfa6c152449..47ce9350770 100644 --- a/hadoop-client-modules/hadoop-client-runtime/pom.xml +++ b/hadoop-client-modules/hadoop-client-runtime/pom.xml @@ -158,6 +158,10 @@ com.google.code.findbugs:jsr305 io.dropwizard.metrics:metrics-core + org.eclipse.jetty.websocket:* + org.eclipse.jetty:jetty-servlet + org.eclipse.jetty:jetty-security + org.ow2.asm:* diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index e7baee003cb..f7e817b811f 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -804,6 +804,21 @@ jetty-util-ajax ${jetty.version} + + org.eclipse.jetty.websocket + javax-websocket-server-impl + ${jetty.version} + + + org.ow2.asm + asm + + + org.eclipse.jetty + jetty-webapp + + + javax.servlet.jsp jsp-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 74be4da8da4..005af0f62bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -83,6 +83,10 @@ org.eclipse.jetty jetty-util + + org.eclipse.jetty.websocket + javax-websocket-server-impl + com.google.guava guava diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 98cc2a48d20..355dea5d02a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -37,6 +37,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,6 +218,16 @@ public abstract boolean signalContainer(ContainerSignalContext ctx) public abstract boolean reapContainer(ContainerReapContext ctx) throws IOException; + /** + * Perform interactive docker command into running container + * + * @param ctx Encapsulates information necessary for exec containers. + * @return return input/output stream if the operation succeeded. + * @throws ContainerExecutionException if container exec fails + */ + public abstract IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException; + /** * Delete specified directories as a given user. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 27224a599e3..c8d3e4af3d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -20,6 +20,10 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -996,6 +1000,18 @@ public void clearLogDirPermissions() { this.logDirPermissions = null; } + /** + * + * @param ctx Encapsulates information necessary for exec containers. + * @return the input/output stream of interactive docker shell. + * @throws ContainerExecutionException + */ + @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + throw new ContainerExecutionException("Unsupported operation."); + } + /** * Return the list of paths of given local directories. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index fccf668ca9d..2ebc00720e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.ipc.Client; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -60,6 +63,12 @@ import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; @@ -779,6 +788,31 @@ public boolean reapContainer(ContainerReapContext ctx) throws IOException { return true; } + /** + * Performs container exec. + * + * @param ctx Encapsulates information necessary for exec container. + * @return stdin and stdout of container exec + * @throws ContainerExecutionException if container exec fails + */ + @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + // TODO: calls PrivilegedOperationExecutor and return IOStream pairs + InputStream in = null; + OutputStream out = null; + int byteSize = 4000; + try { + in = new ByteArrayInputStream("This is input command".getBytes()); + out = new ByteArrayOutputStream(byteSize); + } catch (IllegalArgumentException e) { + LOG.error("Failed to execute command to container runtime", e); + } + IOStreamPair pair = new IOStreamPair(in, out); + System.out.println(pair); + return new IOStreamPair(in, out); + } + @Override public void deleteAsUser(DeletionAsUserContext ctx) { String user = ctx.getUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java new file mode 100644 index 00000000000..bb4f57ce72e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java @@ -0,0 +1,82 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.executor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encapsulates information required for starting/launching containers. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class ContainerExecContext { + private final String user; + private final String appId; + private final String container; + + public static final class Builder { + private String user; + private String appId; + private String container; + + public Builder() { + } + + public Builder setContainer(String container) { + this.container = container; + return this; + } + + public Builder setUser(String user) { + this.user = user; + return this; + } + + public Builder setAppId(String appId) { + this.appId = appId; + return this; + } + + public ContainerExecContext build() { + return new ContainerExecContext(this); + } + } + + private ContainerExecContext(Builder builder) { + this.container = builder.container; + this.user = builder.user; + this.appId = builder.appId; + } + + public String getUser() { + return this.user; + } + + public String getAppId() { + return this.appId; + } + + public String getContainerId() { + return this.container; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java new file mode 100644 index 00000000000..3e06b4bd86a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import java.io.IOException; +import java.net.URI; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Web socket for establishing interactive command shell connection through + * Node Manage to container executor + */ +@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "YARN" }) +@InterfaceStability.Unstable + +@WebSocket +public class ContainerShellWebSocket { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerShellWebSocket.class); + + private final ContainerExecutor exec = new LinuxContainerExecutor(); + + private IOStreamPair pair; + + @OnWebSocketMessage + public void onText(Session session, String message) throws IOException { + LOG.info("Message received: " + message); + + try { + byte[] buffer = new byte[4000]; + if (session.isOpen()) { + int ni = message.length(); + if (ni > 0) { + pair.out.write(message.getBytes()); + pair.out.flush(); + } + int no = pair.in.available(); + int n = pair.in.read(buffer, 0, Math.min(no, buffer.length)); + String formatted = new String(buffer).replaceAll("\n", "\r\n"); + session.getRemote().sendString(formatted); + } + } catch (Exception e) { + LOG.error("Failed to parse WebSocket message from Client", e); + } + + } + + @OnWebSocketConnect + public void onConnect(Session session) { + LOG.info(session.getRemoteAddress().getHostString() + " connected!"); + + try { + URI containerURI = session.getUpgradeRequest().getRequestURI(); + String[] containerPath = containerURI.getPath().split("/"); + String cId = containerPath[2]; + LOG.info( + "Making interactive connection to running docker container with ID: " + + cId); + ContainerExecContext execContext = new ContainerExecContext + .Builder() + .setContainer(cId) + .build(); + pair = exec.execContainer(execContext); + } catch (Exception e) { + LOG.error("Failed to establish WebSocket connection with Client", e); + } + + } + + @OnWebSocketClose + public void onClose(Session session, int status, String reason) { + LOG.info(session.getRemoteAddress().getHostString() + " closed!"); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java new file mode 100644 index 00000000000..31fc97bc7c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import javax.servlet.annotation.WebServlet; + +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +/** + * Container shell web socket interface + */ +@WebServlet(urlPatterns="/container/$containerId") +public class ContainerShellWebSocketServlet extends WebSocketServlet{ + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(ContainerShellWebSocket.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index 813ba142111..3476aeb3271 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -41,7 +41,9 @@ import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class WebServer extends AbstractService { @@ -64,6 +66,7 @@ public WebServer(Context nmContext, ResourceView resourceView, @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); + Map params = new HashMap(); String bindAddress = WebAppUtils.getWebAppBindURL(conf, YarnConfiguration.NM_BIND_HOST, WebAppUtils.getNMWebAppURLWithoutScheme(conf)); @@ -102,6 +105,8 @@ protected void serviceStart() throws Exception { WebApps .$for("node", Context.class, this.nmContext, "ws") .at(bindAddress) + .withServlet("ContainerShellWebSocket", "/container/*", + ContainerShellWebSocketServlet.class, params, false) .with(conf) .withHttpSpnegoPrincipalKey( YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java index 2890bb5eadb..8f38b552bf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; @@ -29,11 +30,14 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.junit.Assert; @@ -42,6 +46,8 @@ import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @SuppressWarnings("deprecation") @@ -177,6 +183,19 @@ public void testReapContainer() throws Exception { assertTrue(containerExecutor.reapContainer(builder.build())); } + @Test + public void testExecContainer() throws Exception { + try { + ContainerExecContext.Builder builder = new ContainerExecContext.Builder(); + builder.setUser("foo").setAppId("app1").setContainer("container1"); + ContainerExecContext ctx = builder.build(); + containerExecutor.execContainer(ctx); + } catch (Exception e) { + // socket exception should be thrown immediately, without RPC retries. + Assert.assertTrue(e instanceof ContainerExecutionException); + } + } + @Test public void testCleanupBeforeLaunch() throws Exception { Container container = mock(Container.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index 6d77fc488da..856d5ff27cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -694,6 +695,17 @@ public void testRelaunchContainer() throws Exception { verify(lce, times(1)).relaunchContainer(ctx); } + @Test + public void testExecContainer() throws Exception { + LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class); + ContainerExecContext.Builder builder = + new ContainerExecContext.Builder(); + builder.setUser("foo").setAppId("app1").setContainer("container1"); + ContainerExecContext ctx = builder.build(); + lce.execContainer(ctx); + verify(lce, times(1)).execContainer(ctx); + } + private static class TestResourceHandler implements LCEResourcesHandler { static Set postExecContainers = new HashSet(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index 8aee532e414..3d535e993ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; @@ -101,6 +104,11 @@ public boolean reapContainer(ContainerReapContext ctx) return true; } @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + return new IOStreamPair(null, null); + } + @Override public void deleteAsUser(DeletionAsUserContext ctx) throws IOException, InterruptedException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java new file mode 100644 index 00000000000..311f88cddc0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +@WebSocket +public class ContainerShellClientSocket extends WebSocketAdapter { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerShellClientSocket.class); + private Session session; + + CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onWebSocketText(String message) { + LOG.info("Message received from server:" + message); + } + + @Override + public void onWebSocketConnect(Session session) { + LOG.info("Connected to server"); + this.session = session; + latch.countDown(); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + session.close(); + } + + @Override + public void onWebSocketError(Throwable cause) { + super.onWebSocketError(cause); + cause.printStackTrace(System.err); + } + + public void sendMessage(String str) { + try { + session.getRemote().sendString(str); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.error("Failed to sent message to server", e); + } + } + + public CountDownLatch getLatch() { + return latch; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java new file mode 100644 index 00000000000..fd57eebbc91 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.util.NodeHealthScriptRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.ResourceView; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; +import java.util.concurrent.Future; + +public class TestNMContainerWebSocket { + private static final Logger LOG = + LoggerFactory.getLogger(TestNMContainerWebSocket.class); + + private static final File testRootDir = new File("target", + TestNMWebServer.class.getSimpleName()); + private static File testLogDir = new File("target", + TestNMWebServer.class.getSimpleName() + "LogDir"); + + @Before + public void setup() { + testRootDir.mkdirs(); + testLogDir.mkdir(); + } + + @After + public void tearDown() { + FileUtil.fullyDelete(testRootDir); + FileUtil.fullyDelete(testLogDir); + } + + private int startNMWebAppServer(String webAddr) { + Configuration conf = new Configuration(); + Context nmContext = new NodeManager.NMContext(null, null, null, null, + null, false, conf); + ResourceView resourceView = new ResourceView() { + @Override + public long getVmemAllocatedForContainers() { + return 0; + } + @Override + public long getPmemAllocatedForContainers() { + return 0; + } + @Override + public long getVCoresAllocatedForContainers() { + return 0; + } + @Override + public boolean isVmemCheckEnabled() { + return true; + } + @Override + public boolean isPmemCheckEnabled() { + return true; + } + }; + conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); + NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf); + healthChecker.init(conf); + LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); + conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr); + WebServer server = new WebServer(nmContext, resourceView, + new ApplicationACLsManager(conf), dirsHandler); + try { + server.init(conf); + server.start(); + return server.getPort(); + } finally { + } + } + + private NodeHealthCheckerService createNodeHealthCheckerService(Configuration conf) { + NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(conf); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + return new NodeHealthCheckerService(scriptRunner, dirsHandler); + } + + @Test + public void testWebServerWithServlet() { + int port = startNMWebAppServer("0.0.0.0"); + LOG.info("bind to port: " + port); + StringBuilder sb = new StringBuilder(); + sb.append("ws://localhost:").append(port).append("/container/abc/"); + String dest = sb.toString(); + WebSocketClient client = new WebSocketClient(); + try { + ContainerShellClientSocket socket = new ContainerShellClientSocket(); + client.start(); + URI echoUri = new URI(dest); + Future future = client.connect(socket, echoUri); + Session session = future.get(); + session.getRemote().sendString("hello world"); + session.close(); + client.stop(); + } catch (Throwable t) { + LOG.error("Failed to connect WebSocket and send message to server", t); + } finally { + try { + client.stop(); + } catch (Exception e) { + LOG.error("Failed to close client", e); + } + } + } +}