diff --git a/hadoop-client-modules/hadoop-client-minicluster/pom.xml b/hadoop-client-modules/hadoop-client-minicluster/pom.xml index 70fca8a89c8..bd81e7d5bc6 100644 --- a/hadoop-client-modules/hadoop-client-minicluster/pom.xml +++ b/hadoop-client-modules/hadoop-client-minicluster/pom.xml @@ -666,6 +666,10 @@ junit:junit com.google.code.findbugs:jsr305 log4j:log4j + org.eclipse.jetty.websocket:* + javax.websocket:javax.websocket-api + javax.annotation:javax.annotation-api + org.eclipse.jetty:jetty-jndi @@ -777,6 +781,12 @@ ehcache-core.xsd + + org.eclipse.jetty.websocket:javax-websocket-server-impl + + * + + diff --git a/hadoop-client-modules/hadoop-client-runtime/pom.xml b/hadoop-client-modules/hadoop-client-runtime/pom.xml index bfa6c152449..47ce9350770 100644 --- a/hadoop-client-modules/hadoop-client-runtime/pom.xml +++ b/hadoop-client-modules/hadoop-client-runtime/pom.xml @@ -158,6 +158,10 @@ com.google.code.findbugs:jsr305 io.dropwizard.metrics:metrics-core + org.eclipse.jetty.websocket:* + org.eclipse.jetty:jetty-servlet + org.eclipse.jetty:jetty-security + org.ow2.asm:* diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index e7baee003cb..f7e817b811f 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -804,6 +804,21 @@ jetty-util-ajax ${jetty.version} + + org.eclipse.jetty.websocket + javax-websocket-server-impl + ${jetty.version} + + + org.ow2.asm + asm + + + org.eclipse.jetty + jetty-webapp + + + javax.servlet.jsp jsp-api diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml index 74be4da8da4..005af0f62bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml @@ -83,6 +83,10 @@ org.eclipse.jetty jetty-util + + org.eclipse.jetty.websocket + javax-websocket-server-impl + com.google.guava guava diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java index 98cc2a48d20..b3a6df1afec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java @@ -37,6 +37,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,6 +218,16 @@ public abstract boolean signalContainer(ContainerSignalContext ctx) public abstract boolean reapContainer(ContainerReapContext ctx) throws IOException; + /** + * Perform interactive docker command into running container. + * + * @param ctx Encapsulates information necessary for exec containers. + * @return return input/output stream if the operation succeeded. + * @throws ContainerExecutionException if container exec fails. + */ + public abstract IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException; + /** * Delete specified directories as a given user. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java index 27224a599e3..c8d3e4af3d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java @@ -20,6 +20,10 @@ import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; + +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -996,6 +1000,18 @@ public void clearLogDirPermissions() { this.logDirPermissions = null; } + /** + * + * @param ctx Encapsulates information necessary for exec containers. + * @return the input/output stream of interactive docker shell. + * @throws ContainerExecutionException + */ + @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + throw new ContainerExecutionException("Unsupported operation."); + } + /** * Return the list of paths of given local directories. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java index fccf668ca9d..58d3068916d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -60,9 +62,14 @@ import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler; import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; +import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -779,6 +786,32 @@ public boolean reapContainer(ContainerReapContext ctx) throws IOException { return true; } + /** + * Performs container exec. + * + * @param ctx Encapsulates information necessary for exec container. + * @return stdin and stdout of container exec. + * @throws ContainerExecutionException if container exec fails. + */ + @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + // TODO: calls PrivilegedOperationExecutor and return IOStream pairs + InputStream in = null; + OutputStream out = null; + int byteSize = 4000; + try { + in = new ByteArrayInputStream( + "This is input command".getBytes(Charset.forName("UTF-8"))); + out = new ByteArrayOutputStream(byteSize); + } catch (IllegalArgumentException e) { + LOG.error("Failed to execute command to container runtime", e); + } + IOStreamPair pair = new IOStreamPair(in, out); + System.out.println(pair); + return new IOStreamPair(in, out); + } + @Override public void deleteAsUser(DeletionAsUserContext ctx) { String user = ctx.getUser(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java new file mode 100644 index 00000000000..4e6c6ec0467 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java @@ -0,0 +1,85 @@ +/* + * * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * / + */ + +package org.apache.hadoop.yarn.server.nodemanager.executor; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Encapsulates information required for starting/launching containers. + */ + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class ContainerExecContext { + private final String user; + private final String appId; + private final String container; + + /** + * Builder for ContainerExecContext. + */ + public static final class Builder { + private String user; + private String appId; + private String container; + + public Builder() { + } + + public Builder setContainer(String container) { + this.container = container; + return this; + } + + public Builder setUser(String user) { + this.user = user; + return this; + } + + public Builder setAppId(String appId) { + this.appId = appId; + return this; + } + + public ContainerExecContext build() { + return new ContainerExecContext(this); + } + } + + private ContainerExecContext(Builder builder) { + this.container = builder.container; + this.user = builder.user; + this.appId = builder.appId; + } + + public String getUser() { + return this.user; + } + + public String getAppId() { + return this.appId; + } + + public String getContainerId() { + return this.container; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java new file mode 100644 index 00000000000..ea61b57ab59 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.Charset; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Web socket for establishing interactive command shell connection through + * Node Manage to container executor. + */ +@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "YARN" }) +@InterfaceStability.Unstable + +@WebSocket +public class ContainerShellWebSocket { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerShellWebSocket.class); + + private final ContainerExecutor exec = new LinuxContainerExecutor(); + + private IOStreamPair pair; + + @OnWebSocketMessage + public void onText(Session session, String message) throws IOException { + LOG.info("Message received: " + message); + + try { + byte[] buffer = new byte[4000]; + if (session.isOpen()) { + int ni = message.length(); + if (ni > 0) { + pair.out.write(message.getBytes(Charset.forName("UTF-8"))); + pair.out.flush(); + } + int no = pair.in.available(); + pair.in.read(buffer, 0, Math.min(no, buffer.length)); + String formatted = new String(buffer, Charset.forName("UTF-8")) + .replaceAll("\n", "\r\n"); + session.getRemote().sendString(formatted); + } + } catch (Exception e) { + LOG.error("Failed to parse WebSocket message from Client", e); + } + + } + + @OnWebSocketConnect + public void onConnect(Session session) { + LOG.info(session.getRemoteAddress().getHostString() + " connected!"); + + try { + URI containerURI = session.getUpgradeRequest().getRequestURI(); + String[] containerPath = containerURI.getPath().split("/"); + String cId = containerPath[2]; + LOG.info( + "Making interactive connection to running docker container with ID: " + + cId); + ContainerExecContext execContext = new ContainerExecContext + .Builder() + .setContainer(cId) + .build(); + pair = exec.execContainer(execContext); + } catch (Exception e) { + LOG.error("Failed to establish WebSocket connection with Client", e); + } + + } + + @OnWebSocketClose + public void onClose(Session session, int status, String reason) { + LOG.info(session.getRemoteAddress().getHostString() + " closed!"); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java new file mode 100644 index 00000000000..8a8d6d102ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import javax.servlet.annotation.WebServlet; + +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +/** + * Container shell web socket interface. + */ +@WebServlet(urlPatterns="/container/container/*") +public class ContainerShellWebSocketServlet extends WebSocketServlet{ + + @Override + public void configure(WebSocketServletFactory factory) { + factory.register(ContainerShellWebSocket.class); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java index 813ba142111..3476aeb3271 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java @@ -41,7 +41,9 @@ import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class WebServer extends AbstractService { @@ -64,6 +66,7 @@ public WebServer(Context nmContext, ResourceView resourceView, @Override protected void serviceStart() throws Exception { Configuration conf = getConfig(); + Map params = new HashMap(); String bindAddress = WebAppUtils.getWebAppBindURL(conf, YarnConfiguration.NM_BIND_HOST, WebAppUtils.getNMWebAppURLWithoutScheme(conf)); @@ -102,6 +105,8 @@ protected void serviceStart() throws Exception { WebApps .$for("node", Context.class, this.nmContext, "ws") .at(bindAddress) + .withServlet("ContainerShellWebSocket", "/container/*", + ContainerShellWebSocketServlet.class, params, false) .with(conf) .withHttpSpnegoPrincipalKey( YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java index 2890bb5eadb..aafdc31bb7c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.junit.Assert; @@ -177,6 +179,19 @@ public void testReapContainer() throws Exception { assertTrue(containerExecutor.reapContainer(builder.build())); } + @Test + public void testExecContainer() throws Exception { + try { + ContainerExecContext.Builder builder = new ContainerExecContext.Builder(); + builder.setUser("foo").setAppId("app1").setContainer("container1"); + ContainerExecContext ctx = builder.build(); + containerExecutor.execContainer(ctx); + } catch (Exception e) { + // socket exception should be thrown immediately, without RPC retries. + Assert.assertTrue(e instanceof ContainerExecutionException); + } + } + @Test public void testCleanupBeforeLaunch() throws Exception { Container container = mock(Container.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java index 6d77fc488da..856d5ff27cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.when; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -694,6 +695,17 @@ public void testRelaunchContainer() throws Exception { verify(lce, times(1)).relaunchContainer(ctx); } + @Test + public void testExecContainer() throws Exception { + LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class); + ContainerExecContext.Builder builder = + new ContainerExecContext.Builder(); + builder.setUser("foo").setAppId("app1").setContainer("container1"); + ContainerExecContext ctx = builder.build(); + lce.execContainer(ctx); + verify(lce, times(1)).execContainer(ctx); + } + private static class TestResourceHandler implements LCEResourcesHandler { static Set postExecContainers = new HashSet(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java index 8aee532e414..3d535e993ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; @@ -101,6 +104,11 @@ public boolean reapContainer(ContainerReapContext ctx) return true; } @Override + public IOStreamPair execContainer(ContainerExecContext ctx) + throws ContainerExecutionException { + return new IOStreamPair(null, null); + } + @Override public void deleteAsUser(DeletionAsUserContext ctx) throws IOException, InterruptedException { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocketTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocketTest.java new file mode 100644 index 00000000000..e059de90b7f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocketTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** + * Container shell client socket interface. + */ +@WebSocket +public class ContainerShellClientSocketTest extends WebSocketAdapter { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerShellClientSocketTest.class); + private Session session; + private CountDownLatch latch = new CountDownLatch(1); + + @Override + public void onWebSocketText(String message) { + LOG.info("Message received from server:" + message); + } + + @Override + public void onWebSocketConnect(Session session) { + LOG.info("Connected to server"); + this.session = session; + latch.countDown(); + } + + @Override + public void onWebSocketClose(int statusCode, String reason) { + session.close(); + } + + @Override + public void onWebSocketError(Throwable cause) { + super.onWebSocketError(cause); + cause.printStackTrace(System.err); + } + + public void sendMessage(String str) { + try { + session.getRemote().sendString(str); + } catch (IOException e) { + // TODO Auto-generated catch block + LOG.error("Failed to sent message to server", e); + } + } + + public CountDownLatch getLatch() { + return latch; + } + + public void setLatch(CountDownLatch latch) { + this.latch = latch; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java new file mode 100644 index 00000000000..50042f04ae5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.webapp; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.util.NodeHealthScriptRunner; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.ResourceView; +import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; +import java.util.concurrent.Future; + +/** + * Test class for Node Manager Container Web Socket. + */ +public class TestNMContainerWebSocket { + private static final Logger LOG = LoggerFactory.getLogger( + TestNMContainerWebSocket.class); + + private static final File TESTROOTDIR = new File("target", + TestNMWebServer.class.getSimpleName()); + private static File testLogDir = new File("target", + TestNMWebServer.class.getSimpleName() + "LogDir"); + + @Before + public void setup() { + TESTROOTDIR.mkdirs(); + testLogDir.mkdir(); + } + + @After + public void tearDown() { + FileUtil.fullyDelete(TESTROOTDIR); + FileUtil.fullyDelete(testLogDir); + } + + private int startNMWebAppServer(String webAddr) { + Configuration conf = new Configuration(); + Context nmContext = new NodeManager.NMContext(null, null, null, null, null, + false, conf); + ResourceView resourceView = new ResourceView() { + @Override + public long getVmemAllocatedForContainers() { + return 0; + } + + @Override + public long getPmemAllocatedForContainers() { + return 0; + } + + @Override + public long getVCoresAllocatedForContainers() { + return 0; + } + + @Override + public boolean isVmemCheckEnabled() { + return true; + } + + @Override + public boolean isPmemCheckEnabled() { + return true; + } + }; + conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath()); + NodeHealthCheckerService healthChecker = createNodeHealthCheckerService( + conf); + healthChecker.init(conf); + LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); + conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr); + WebServer server = new WebServer(nmContext, resourceView, + new ApplicationACLsManager(conf), dirsHandler); + try { + server.init(conf); + server.start(); + return server.getPort(); + } finally { + } + } + + private NodeHealthCheckerService createNodeHealthCheckerService( + Configuration conf) { + NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner( + conf); + LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); + return new NodeHealthCheckerService(scriptRunner, dirsHandler); + } + + @Test + public void testWebServerWithServlet() { + int port = startNMWebAppServer("0.0.0.0"); + LOG.info("bind to port: " + port); + StringBuilder sb = new StringBuilder(); + sb.append("ws://localhost:").append(port).append("/container/abc/"); + String dest = sb.toString(); + WebSocketClient client = new WebSocketClient(); + try { + ContainerShellClientSocketTest socket = new ContainerShellClientSocketTest(); + client.start(); + URI echoUri = new URI(dest); + Future future = client.connect(socket, echoUri); + Session session = future.get(); + session.getRemote().sendString("hello world"); + session.close(); + client.stop(); + } catch (Throwable t) { + LOG.error("Failed to connect WebSocket and send message to server", t); + } finally { + try { + client.stop(); + } catch (Exception e) { + LOG.error("Failed to close client", e); + } + } + } +}