diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 4dd7cb1..1a3b8d9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -16,7 +16,10 @@ import java.io.File; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Arrays; @@ -27,8 +30,12 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import javax.net.SocketFactory; + import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; + +import org.apache.commons.net.DefaultSocketFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -76,6 +83,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.tez.runtime.task.TezTaskRunner; public class ContainerRunnerImpl extends AbstractService implements ContainerRunner { @@ -294,11 +302,49 @@ protected ContainerExecutionResult callInternal() throws Exception { NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); SecurityUtil.setTokenService(jobToken, address); taskOwner.addToken(jobToken); + + final SocketFactory sock = new SocketFactory() { + + private final SocketFactory ff = SocketFactory.getDefault(); + + @Override + public Socket createSocket(InetAddress arg0, int arg1, InetAddress arg2, int arg3) + throws IOException { + LOG.warn("Using custom socket factory"); + return ff.createSocket(arg0, arg1, arg2, arg3); + } + + @Override + public Socket createSocket(String arg0, int arg1, InetAddress arg2, int arg3) throws IOException, + UnknownHostException { + LOG.warn("Using custom socket factory"); + return ff.createSocket(arg0, arg1, arg2, arg3); + } + + @Override + public Socket createSocket(InetAddress arg0, int arg1) throws IOException { + LOG.warn("Using custom socket factory"); + return ff.createSocket(arg0, arg1); + } + + @Override + public Socket createSocket(String arg0, int arg1) throws IOException, UnknownHostException { + LOG.warn("Using custom socket factory"); + return ff.createSocket(arg0, arg1); + } + + @Override + public Socket createSocket() throws IOException, UnknownHostException { + LOG.warn("Using unconnected sockets"); + return ff.createSocket(); + } + }; + umbilical = taskOwner.doAs(new PrivilegedExceptionAction() { @Override public TezTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(TezTaskUmbilicalProtocol.class, - TezTaskUmbilicalProtocol.versionID, address, conf); + TezTaskUmbilicalProtocol.versionID, address, conf, sock); } });