diff --git llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index f6ec119..2b128f4 100644 --- llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -79,7 +79,7 @@ public LlapConfiguration() { public static final int LLAP_DAEMON_SERVICE_REFRESH_INTERVAL_DEFAULT = 60; // seconds public static final String LLAP_DAEMON_COMMUNICATOR_NUM_THREADS = LLAP_DAEMON_PREFIX + "communicator.num.threads"; - public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 5; + public static final int LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT = 10; /** * Minimum time after which a previously disabled node will be re-enabled for scheduling. This may @@ -122,4 +122,15 @@ public LlapConfiguration() { public static final String LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION = LLAP_DAEMON_PREFIX + "task.scheduler.enable.preemption"; public static final boolean LLAP_DAEMON_TASK_SCHEDULER_ENABLE_PREEMPTION_DEFAULT = false; + + + /** Amount of time to wait on a connection failure to an LLAP daemon */ + public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS = + LLAP_PREFIX + "task.communicator.connection.timeout-millis"; + public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 16000; + + /** Sleep duration while waiting for a connection failure */ + public static final String LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS = + LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis"; + public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l; } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java index 9f161fe..5805869 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java @@ -14,11 +14,14 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import javax.annotation.Nullable; +import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import io.netty.util.NetUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; @@ -29,10 +32,14 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.ProtocolProxy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; +import org.apache.hadoop.security.UserGroupInformation; // TODO Change all this to be based on a regular interface instead of relying on the Proto service - Exception signatures cannot be controlled without this for the moment. @@ -41,12 +48,22 @@ private final Configuration conf; private final InetSocketAddress serverAddr; + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; LlapDaemonProtocolBlockingPB proxy; - public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port) { + public LlapDaemonProtocolClientImpl(Configuration conf, String hostname, int port, + @Nullable RetryPolicy retryPolicy, + @Nullable SocketFactory socketFactory) { this.conf = conf; this.serverAddr = NetUtils.createSocketAddr(hostname, port); + this.retryPolicy = retryPolicy; + if (socketFactory == null) { + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + } else { + this.socketFactory = socketFactory; + } } @Override @@ -101,10 +118,12 @@ public LlapDaemonProtocolBlockingPB getProxy() throws IOException { } public LlapDaemonProtocolBlockingPB createProxy() throws IOException { - LlapDaemonProtocolBlockingPB p; // TODO Fix security RPC.setProtocolEngine(conf, LlapDaemonProtocolBlockingPB.class, ProtobufRpcEngine.class); - p = RPC.getProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, conf); - return p; + ProtocolProxy proxy = + RPC.getProtocolProxy(LlapDaemonProtocolBlockingPB.class, 0, serverAddr, + UserGroupInformation.getCurrentUser(), conf, NetUtils.getDefaultSocketFactory(conf), 0, + retryPolicy); + return proxy.getProxy(); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index d614548..6abd706 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -111,7 +111,7 @@ public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); int numThreads = conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS, LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT); - this.communicator = new TaskCommunicator(numThreads); + this.communicator = new TaskCommunicator(numThreads, conf); this.deleteDelayOnDagComplete = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS, LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT); LOG.info("Running LlapTaskCommunicator with " diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java index d536eb2..cec17f9 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java @@ -14,11 +14,13 @@ package org.apache.hadoop.hive.llap.tezplugins; +import javax.net.SocketFactory; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -29,6 +31,8 @@ import com.google.protobuf.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB; import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; @@ -37,6 +41,9 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.service.AbstractService; public class TaskCommunicator extends AbstractService { @@ -46,12 +53,30 @@ private final ConcurrentMap hostProxies; private ListeningExecutorService executor; - public TaskCommunicator(int numThreads) { + private final RetryPolicy retryPolicy; + private final SocketFactory socketFactory; + + public TaskCommunicator(int numThreads, Configuration conf) { super(TaskCommunicator.class.getSimpleName()); ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build()); this.hostProxies = new ConcurrentHashMap<>(); executor = MoreExecutors.listeningDecorator(localExecutor); + + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + + long connectionTimeout = + conf.getLong(LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS, + LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MILLIS_DEFAULT); + long retrySleep = conf.getLong( + LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS, + LlapConfiguration.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT); + this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(connectionTimeout, retrySleep, + TimeUnit.MILLISECONDS); + LOG.info("Setting up taskCommunicator with" + + "numThreads=" + numThreads + + "retryTime(millis)=" + connectionTimeout + + "retrySleep(millis)=" + retrySleep); } @Override @@ -180,7 +205,7 @@ private LlapDaemonProtocolBlockingPB getProxy(String hostname, int port) { LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId); if (proxy == null) { - proxy = new LlapDaemonProtocolClientImpl(getConfig(), hostname, port); + proxy = new LlapDaemonProtocolClientImpl(getConfig(), hostname, port, retryPolicy, socketFactory); LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy); if (proxyOld != null) { // TODO Shutdown the new proxy. diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index b2ac632..8d45c95 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -48,7 +48,7 @@ public void test() throws ServiceException { LlapDaemonProtocolBlockingPB client = new LlapDaemonProtocolClientImpl(new Configuration(), serverAddr.getHostName(), - serverAddr.getPort()); + serverAddr.getPort(), null, null); client.submitWork(null, SubmitWorkRequestProto.newBuilder() .setAmHost("amhost")