com.google.guava
guava
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 98cc2a48d20..355dea5d02a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -37,6 +37,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,6 +218,16 @@ public abstract boolean signalContainer(ContainerSignalContext ctx)
public abstract boolean reapContainer(ContainerReapContext ctx)
throws IOException;
+ /**
+ * Perform interactive docker command into running container
+ *
+ * @param ctx Encapsulates information necessary for exec containers.
+ * @return return input/output stream if the operation succeeded.
+ * @throws ContainerExecutionException if container exec fails
+ */
+ public abstract IOStreamPair execContainer(ContainerExecContext ctx)
+ throws ContainerExecutionException;
+
/**
* Delete specified directories as a given user.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index 27224a599e3..c8d3e4af3d4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -20,6 +20,10 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -996,6 +1000,18 @@ public void clearLogDirPermissions() {
this.logDirPermissions = null;
}
+ /**
+ *
+ * @param ctx Encapsulates information necessary for exec containers.
+ * @return the input/output stream of interactive docker shell.
+ * @throws ContainerExecutionException
+ */
+ @Override
+ public IOStreamPair execContainer(ContainerExecContext ctx)
+ throws ContainerExecutionException {
+ throw new ContainerExecutionException("Unsupported operation.");
+ }
+
/**
* Return the list of paths of given local directories.
*
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
index fccf668ca9d..2ebc00720e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
@@ -20,6 +20,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -60,6 +63,12 @@
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -779,6 +788,31 @@ public boolean reapContainer(ContainerReapContext ctx) throws IOException {
return true;
}
+ /**
+ * Performs container exec.
+ *
+ * @param ctx Encapsulates information necessary for exec container.
+ * @return stdin and stdout of container exec
+ * @throws ContainerExecutionException if container exec fails
+ */
+ @Override
+ public IOStreamPair execContainer(ContainerExecContext ctx)
+ throws ContainerExecutionException {
+ // TODO: calls PrivilegedOperationExecutor and return IOStream pairs
+ InputStream in = null;
+ OutputStream out = null;
+ int byteSize = 4000;
+ try {
+ in = new ByteArrayInputStream("This is input command".getBytes());
+ out = new ByteArrayOutputStream(byteSize);
+ } catch (IllegalArgumentException e) {
+ LOG.error("Failed to execute command to container runtime", e);
+ }
+ IOStreamPair pair = new IOStreamPair(in, out);
+ System.out.println(pair);
+ return new IOStreamPair(in, out);
+ }
+
@Override
public void deleteAsUser(DeletionAsUserContext ctx) {
String user = ctx.getUser();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java
new file mode 100644
index 00000000000..bb4f57ce72e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerExecContext.java
@@ -0,0 +1,82 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.executor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Encapsulates information required for starting/launching containers.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class ContainerExecContext {
+ private final String user;
+ private final String appId;
+ private final String container;
+
+ public static final class Builder {
+ private String user;
+ private String appId;
+ private String container;
+
+ public Builder() {
+ }
+
+ public Builder setContainer(String container) {
+ this.container = container;
+ return this;
+ }
+
+ public Builder setUser(String user) {
+ this.user = user;
+ return this;
+ }
+
+ public Builder setAppId(String appId) {
+ this.appId = appId;
+ return this;
+ }
+
+ public ContainerExecContext build() {
+ return new ContainerExecContext(this);
+ }
+ }
+
+ private ContainerExecContext(Builder builder) {
+ this.container = builder.container;
+ this.user = builder.user;
+ this.appId = builder.appId;
+ }
+
+ public String getUser() {
+ return this.user;
+ }
+
+ public String getAppId() {
+ return this.appId;
+ }
+
+ public String getContainerId() {
+ return this.container;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java
new file mode 100644
index 00000000000..3e06b4bd86a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocket.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Web socket for establishing interactive command shell connection through
+ * Node Manage to container executor
+ */
+@InterfaceAudience.LimitedPrivate({ "HDFS", "MapReduce", "YARN" })
+@InterfaceStability.Unstable
+
+@WebSocket
+public class ContainerShellWebSocket {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerShellWebSocket.class);
+
+ private final ContainerExecutor exec = new LinuxContainerExecutor();
+
+ private IOStreamPair pair;
+
+ @OnWebSocketMessage
+ public void onText(Session session, String message) throws IOException {
+ LOG.info("Message received: " + message);
+
+ try {
+ byte[] buffer = new byte[4000];
+ if (session.isOpen()) {
+ int ni = message.length();
+ if (ni > 0) {
+ pair.out.write(message.getBytes());
+ pair.out.flush();
+ }
+ int no = pair.in.available();
+ int n = pair.in.read(buffer, 0, Math.min(no, buffer.length));
+ String formatted = new String(buffer).replaceAll("\n", "\r\n");
+ session.getRemote().sendString(formatted);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to parse WebSocket message from Client", e);
+ }
+
+ }
+
+ @OnWebSocketConnect
+ public void onConnect(Session session) {
+ LOG.info(session.getRemoteAddress().getHostString() + " connected!");
+
+ try {
+ URI containerURI = session.getUpgradeRequest().getRequestURI();
+ String[] containerPath = containerURI.getPath().split("/");
+ String cId = containerPath[2];
+ LOG.info(
+ "Making interactive connection to running docker container with ID: "
+ + cId);
+ ContainerExecContext execContext = new ContainerExecContext
+ .Builder()
+ .setContainer(cId)
+ .build();
+ pair = exec.execContainer(execContext);
+ } catch (Exception e) {
+ LOG.error("Failed to establish WebSocket connection with Client", e);
+ }
+
+ }
+
+ @OnWebSocketClose
+ public void onClose(Session session, int status, String reason) {
+ LOG.info(session.getRemoteAddress().getHostString() + " closed!");
+ }
+
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java
new file mode 100644
index 00000000000..31fc97bc7c4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellWebSocketServlet.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import javax.servlet.annotation.WebServlet;
+
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Container shell web socket interface
+ */
+@WebServlet(urlPatterns="/container/$containerId")
+public class ContainerShellWebSocketServlet extends WebSocketServlet{
+
+ @Override
+ public void configure(WebSocketServletFactory factory) {
+ factory.register(ContainerShellWebSocket.class);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
index 813ba142111..3476aeb3271 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
@@ -41,7 +41,9 @@
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class WebServer extends AbstractService {
@@ -64,6 +66,7 @@ public WebServer(Context nmContext, ResourceView resourceView,
@Override
protected void serviceStart() throws Exception {
Configuration conf = getConfig();
+ Map params = new HashMap();
String bindAddress = WebAppUtils.getWebAppBindURL(conf,
YarnConfiguration.NM_BIND_HOST,
WebAppUtils.getNMWebAppURLWithoutScheme(conf));
@@ -102,6 +105,8 @@ protected void serviceStart() throws Exception {
WebApps
.$for("node", Context.class, this.nmContext, "ws")
.at(bindAddress)
+ .withServlet("ContainerShellWebSocket", "/container/*",
+ ContainerShellWebSocketServlet.class, params, false)
.with(conf)
.withHttpSpnegoPrincipalKey(
YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
index 2890bb5eadb..8f38b552bf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
@@ -29,11 +30,14 @@
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.junit.Assert;
@@ -42,6 +46,8 @@
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SuppressWarnings("deprecation")
@@ -177,6 +183,19 @@ public void testReapContainer() throws Exception {
assertTrue(containerExecutor.reapContainer(builder.build()));
}
+ @Test
+ public void testExecContainer() throws Exception {
+ try {
+ ContainerExecContext.Builder builder = new ContainerExecContext.Builder();
+ builder.setUser("foo").setAppId("app1").setContainer("container1");
+ ContainerExecContext ctx = builder.build();
+ containerExecutor.execContainer(ctx);
+ } catch (Exception e) {
+ // socket exception should be thrown immediately, without RPC retries.
+ Assert.assertTrue(e instanceof ContainerExecutionException);
+ }
+ }
+
@Test
public void testCleanupBeforeLaunch() throws Exception {
Container container = mock(Container.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
index 6d77fc488da..856d5ff27cb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
@@ -33,6 +33,7 @@
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.runtime.LinuxContainerRuntime;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -694,6 +695,17 @@ public void testRelaunchContainer() throws Exception {
verify(lce, times(1)).relaunchContainer(ctx);
}
+ @Test
+ public void testExecContainer() throws Exception {
+ LinuxContainerExecutor lce = mock(LinuxContainerExecutor.class);
+ ContainerExecContext.Builder builder =
+ new ContainerExecContext.Builder();
+ builder.setUser("foo").setAppId("app1").setContainer("container1");
+ ContainerExecContext ctx = builder.build();
+ lce.execContainer(ctx);
+ verify(lce, times(1)).execContainer(ctx);
+ }
+
private static class TestResourceHandler implements LCEResourcesHandler {
static Set postExecContainers = new HashSet();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
index 8aee532e414..3d535e993ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitorResourceChange.java
@@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -41,6 +42,8 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.runtime.ContainerExecutionException;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerExecContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerLivenessContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReapContext;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
@@ -101,6 +104,11 @@ public boolean reapContainer(ContainerReapContext ctx)
return true;
}
@Override
+ public IOStreamPair execContainer(ContainerExecContext ctx)
+ throws ContainerExecutionException {
+ return new IOStreamPair(null, null);
+ }
+ @Override
public void deleteAsUser(DeletionAsUserContext ctx)
throws IOException, InterruptedException {
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java
new file mode 100644
index 00000000000..311f88cddc0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerShellClientSocket.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+@WebSocket
+public class ContainerShellClientSocket extends WebSocketAdapter {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ContainerShellClientSocket.class);
+ private Session session;
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void onWebSocketText(String message) {
+ LOG.info("Message received from server:" + message);
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ LOG.info("Connected to server");
+ this.session = session;
+ latch.countDown();
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ session.close();
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ super.onWebSocketError(cause);
+ cause.printStackTrace(System.err);
+ }
+
+ public void sendMessage(String str) {
+ try {
+ session.getRemote().sendString(str);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ LOG.error("Failed to sent message to server", e);
+ }
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+}
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java
new file mode 100644
index 00000000000..fd57eebbc91
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMContainerWebSocket.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.webapp;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.NodeHealthScriptRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Future;
+
+public class TestNMContainerWebSocket {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestNMContainerWebSocket.class);
+
+ private static final File testRootDir = new File("target",
+ TestNMWebServer.class.getSimpleName());
+ private static File testLogDir = new File("target",
+ TestNMWebServer.class.getSimpleName() + "LogDir");
+
+ @Before
+ public void setup() {
+ testRootDir.mkdirs();
+ testLogDir.mkdir();
+ }
+
+ @After
+ public void tearDown() {
+ FileUtil.fullyDelete(testRootDir);
+ FileUtil.fullyDelete(testLogDir);
+ }
+
+ private int startNMWebAppServer(String webAddr) {
+ Configuration conf = new Configuration();
+ Context nmContext = new NodeManager.NMContext(null, null, null, null,
+ null, false, conf);
+ ResourceView resourceView = new ResourceView() {
+ @Override
+ public long getVmemAllocatedForContainers() {
+ return 0;
+ }
+ @Override
+ public long getPmemAllocatedForContainers() {
+ return 0;
+ }
+ @Override
+ public long getVCoresAllocatedForContainers() {
+ return 0;
+ }
+ @Override
+ public boolean isVmemCheckEnabled() {
+ return true;
+ }
+ @Override
+ public boolean isPmemCheckEnabled() {
+ return true;
+ }
+ };
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
+ conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
+ NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
+ healthChecker.init(conf);
+ LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
+ conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
+ WebServer server = new WebServer(nmContext, resourceView,
+ new ApplicationACLsManager(conf), dirsHandler);
+ try {
+ server.init(conf);
+ server.start();
+ return server.getPort();
+ } finally {
+ }
+ }
+
+ private NodeHealthCheckerService createNodeHealthCheckerService(Configuration conf) {
+ NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(conf);
+ LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
+ return new NodeHealthCheckerService(scriptRunner, dirsHandler);
+ }
+
+ @Test
+ public void testWebServerWithServlet() {
+ int port = startNMWebAppServer("0.0.0.0");
+ LOG.info("bind to port: " + port);
+ StringBuilder sb = new StringBuilder();
+ sb.append("ws://localhost:").append(port).append("/container/abc/");
+ String dest = sb.toString();
+ WebSocketClient client = new WebSocketClient();
+ try {
+ ContainerShellClientSocket socket = new ContainerShellClientSocket();
+ client.start();
+ URI echoUri = new URI(dest);
+ Future future = client.connect(socket, echoUri);
+ Session session = future.get();
+ session.getRemote().sendString("hello world");
+ session.close();
+ client.stop();
+ } catch (Throwable t) {
+ LOG.error("Failed to connect WebSocket and send message to server", t);
+ } finally {
+ try {
+ client.stop();
+ } catch (Exception e) {
+ LOG.error("Failed to close client", e);
+ }
+ }
+ }
+}