diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
index e69de29..2884e40 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapDaemonProtocolClientProxy.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import javax.net.SocketFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapDaemonProtocolClientProxy extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapDaemonProtocolClientProxy.class);
+
+ private final ConcurrentMap hostProxies;
+
+ private final RequestManager requestManager;
+ private final RetryPolicy retryPolicy;
+ private final SocketFactory socketFactory;
+
+ private final ListeningExecutorService requestManagerExecutor;
+ private volatile ListenableFuture requestManagerFuture;
+ private final Token llapToken;
+
+ public LlapDaemonProtocolClientProxy(
+ int numThreads, Configuration conf, Token llapToken) {
+ super(LlapDaemonProtocolClientProxy.class.getSimpleName());
+ this.hostProxies = new ConcurrentHashMap<>();
+ this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+ this.llapToken = llapToken;
+
+ long connectionTimeout = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ long retrySleep = HiveConf.getTimeVar(conf,
+ ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+ TimeUnit.MILLISECONDS);
+ this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+ connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
+
+ this.requestManager = new RequestManager(numThreads);
+ ExecutorService localExecutor = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
+ this.requestManagerExecutor = MoreExecutors.listeningDecorator(localExecutor);
+
+ LOG.info("Setting up taskCommunicator with" +
+ "numThreads=" + numThreads +
+ "retryTime(millis)=" + connectionTimeout +
+ "retrySleep(millis)=" + retrySleep);
+ }
+
+ @Override
+ public void serviceStart() {
+ requestManagerFuture = requestManagerExecutor.submit(requestManager);
+ Futures.addCallback(requestManagerFuture, new FutureCallback() {
+ @Override
+ public void onSuccess(Void result) {
+ LOG.info("RequestManager shutdown");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ LOG.warn("RequestManager shutdown with error", t);
+ }
+ });
+ }
+
+ @Override
+ public void serviceStop() {
+ if (requestManagerFuture != null) {
+ requestManager.shutdown();
+ requestManagerFuture.cancel(true);
+ }
+ requestManagerExecutor.shutdown();
+ }
+
+ public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port,
+ final ExecuteRequestCallback callback) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback));
+ }
+
+ public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host,
+ final int port,
+ final ExecuteRequestCallback callback) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ requestManager.queueRequest(
+ new SendSourceStateUpdateCallable(nodeId, request, callback));
+ }
+
+ public void sendQueryComplete(final QueryCompleteRequestProto request, final String host,
+ final int port,
+ final ExecuteRequestCallback callback) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, callback));
+ }
+
+ public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host,
+ final int port,
+ final ExecuteRequestCallback callback) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback));
+ }
+
+ @VisibleForTesting
+ static class RequestManager implements Callable {
+
+ private final Lock lock = new ReentrantLock();
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final Condition queueCondition = lock.newCondition();
+ private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
+ private final int maxConcurrentRequestsPerNode = 1;
+ private final ListeningExecutorService executor;
+
+
+ // Tracks new additions via add, while the loop is processing existing ones.
+ private final LinkedList newRequestList = new LinkedList<>();
+
+ // Tracks existing requests which are cycled through.
+ private final LinkedList pendingRequests = new LinkedList<>();
+
+ // Tracks requests executing per node
+ private final ConcurrentMap runningRequests = new ConcurrentHashMap<>();
+
+ // Tracks completed requests pre node
+ private final LinkedList completedNodes = new LinkedList<>();
+
+ public RequestManager(int numThreads) {
+ ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build());
+ executor = MoreExecutors.listeningDecorator(localExecutor);
+ }
+
+
+ @VisibleForTesting
+ Set currentLoopDisabledNodes = new HashSet<>();
+ @VisibleForTesting
+ List currentLoopSkippedRequests = new LinkedList<>();
+ @Override
+ public Void call() {
+ // Caches disabled nodes for quicker lookups and ensures a request on a node which was skipped
+ // does not go out of order.
+ while (!isShutdown.get()) {
+ lock.lock();
+ try {
+ while (!shouldRun.get()) {
+ queueCondition.await();
+ break; // Break out and try executing.
+ }
+ boolean shouldBreak = process();
+ if (shouldBreak) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ break;
+ } else {
+ LOG.warn("RunLoop interrupted without being shutdown first");
+ throw new RuntimeException(e);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ LOG.info("CallScheduler loop exiting");
+ return null;
+ }
+
+ /* Add a new request to be executed */
+ public void queueRequest(CallableRequest request) {
+ synchronized (newRequestList) {
+ newRequestList.add(request);
+ shouldRun.set(true);
+ }
+ notifyRunLoop();
+ }
+
+ /* Indicates a request has completed on a node */
+ public void requestFinished(LlapNodeId nodeId) {
+ synchronized (completedNodes) {
+ completedNodes.add(nodeId);
+ shouldRun.set(true);
+ }
+ notifyRunLoop();
+ }
+
+ public void shutdown() {
+ if (!isShutdown.getAndSet(true)) {
+ executor.shutdownNow();
+ notifyRunLoop();
+ }
+ }
+
+ @VisibleForTesting
+ void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
+ ListenableFuture future =
+ executor.submit(request);
+ Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this));
+ }
+
+ @VisibleForTesting
+ boolean process() {
+ if (isShutdown.get()) {
+ return true;
+ }
+ currentLoopDisabledNodes.clear();
+ currentLoopSkippedRequests.clear();
+
+ // Set to false to block the next loop. This must be called before draining the lists,
+ // otherwise an add/completion after draining the lists but before setting it to false,
+ // will not trigger a run. May cause one unnecessary run if an add comes in before drain.
+ // drain list. add request (setTrue). setFalse needs to be avoided.
+ shouldRun.compareAndSet(true, false);
+ // Drain any calls which may have come in during the last execution of the loop.
+ drainNewRequestList(); // Locks newRequestList
+ drainCompletedNodes(); // Locks completedNodes
+
+
+ Iterator iterator = pendingRequests.iterator();
+ while (iterator.hasNext()) {
+ CallableRequest request = iterator.next();
+ iterator.remove();
+ LlapNodeId nodeId = request.getNodeId();
+ if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
+ submitToExecutor(request, nodeId);
+ } else {
+ currentLoopDisabledNodes.add(nodeId);
+ currentLoopSkippedRequests.add(request);
+ }
+ }
+ // Tried scheduling everything that could be scheduled in this loop.
+ pendingRequests.addAll(0, currentLoopSkippedRequests);
+ return false;
+ }
+
+ private void drainNewRequestList() {
+ synchronized (newRequestList) {
+ if (!newRequestList.isEmpty()) {
+ pendingRequests.addAll(newRequestList);
+ newRequestList.clear();
+ }
+ }
+ }
+
+ private void drainCompletedNodes() {
+ synchronized (completedNodes) {
+ if (!completedNodes.isEmpty()) {
+ for (LlapNodeId nodeId : completedNodes) {
+ runningRequests.get(nodeId).decrementAndGet();
+ }
+ }
+ completedNodes.clear();
+ }
+ }
+
+ private boolean canRunForNode(LlapNodeId nodeId, Set currentRunDisabledNodes) {
+ if (currentRunDisabledNodes.contains(nodeId)) {
+ return false;
+ } else {
+ AtomicInteger count = runningRequests.get(nodeId);
+ if (count == null) {
+ count = new AtomicInteger(0);
+ AtomicInteger old = runningRequests.putIfAbsent(nodeId, count);
+ count = old != null ? old : count;
+ }
+ if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) {
+ return true;
+ } else {
+ count.decrementAndGet();
+ return false;
+ }
+ }
+ }
+
+ private void notifyRunLoop() {
+ lock.lock();
+ try {
+ queueCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+
+ private static final class ResponseCallback
+ implements FutureCallback {
+
+ private final ExecuteRequestCallback callback;
+ private final LlapNodeId nodeId;
+ private final RequestManager requestManager;
+
+ public ResponseCallback(ExecuteRequestCallback callback, LlapNodeId nodeId,
+ RequestManager requestManager) {
+ this.callback = callback;
+ this.nodeId = nodeId;
+ this.requestManager = requestManager;
+ }
+
+ @Override
+ public void onSuccess(TYPE result) {
+ try {
+ callback.setResponse(result);
+ } finally {
+ requestManager.requestFinished(nodeId);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ try {
+ callback.indicateError(t);
+ } finally {
+ requestManager.requestFinished(nodeId);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ static abstract class CallableRequest
+ implements Callable {
+
+ final LlapNodeId nodeId;
+ final ExecuteRequestCallback callback;
+ final REQUEST request;
+
+
+ protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback callback) {
+ this.nodeId = nodeId;
+ this.request = request;
+ this.callback = callback;
+ }
+
+ public LlapNodeId getNodeId() {
+ return nodeId;
+ }
+
+ public ExecuteRequestCallback getCallback() {
+ return callback;
+ }
+
+ public abstract RESPONSE call() throws Exception;
+ }
+
+ private class SubmitWorkCallable extends CallableRequest {
+
+ protected SubmitWorkCallable(LlapNodeId nodeId,
+ SubmitWorkRequestProto submitWorkRequestProto,
+ ExecuteRequestCallback callback) {
+ super(nodeId, submitWorkRequestProto, callback);
+ }
+
+ @Override
+ public SubmitWorkResponseProto call() throws Exception {
+ return getProxy(nodeId).submitWork(null, request);
+ }
+ }
+
+ private class SendSourceStateUpdateCallable
+ extends CallableRequest {
+
+ public SendSourceStateUpdateCallable(LlapNodeId nodeId,
+ SourceStateUpdatedRequestProto request,
+ ExecuteRequestCallback callback) {
+ super(nodeId, request, callback);
+ }
+
+ @Override
+ public SourceStateUpdatedResponseProto call() throws Exception {
+ return getProxy(nodeId).sourceStateUpdated(null, request);
+ }
+ }
+
+ private class SendQueryCompleteCallable
+ extends CallableRequest {
+
+ protected SendQueryCompleteCallable(LlapNodeId nodeId,
+ QueryCompleteRequestProto queryCompleteRequestProto,
+ ExecuteRequestCallback callback) {
+ super(nodeId, queryCompleteRequestProto, callback);
+ }
+
+ @Override
+ public QueryCompleteResponseProto call() throws Exception {
+ return getProxy(nodeId).queryComplete(null, request);
+ }
+ }
+
+ private class SendTerminateFragmentCallable
+ extends CallableRequest {
+
+ protected SendTerminateFragmentCallable(LlapNodeId nodeId,
+ TerminateFragmentRequestProto terminateFragmentRequestProto,
+ ExecuteRequestCallback callback) {
+ super(nodeId, terminateFragmentRequestProto, callback);
+ }
+
+ @Override
+ public TerminateFragmentResponseProto call() throws Exception {
+ return getProxy(nodeId).terminateFragment(null, request);
+ }
+ }
+
+ public interface ExecuteRequestCallback {
+ void setResponse(T response);
+ void indicateError(Throwable t);
+ }
+
+ private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
+ String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
+
+ LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
+ if (proxy == null) {
+ if (llapToken == null) {
+ proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ nodeId.getPort(), retryPolicy, socketFactory);
+ } else {
+ UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Token nodeToken = new Token(llapToken);
+ SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
+ nodeId.getHostname(), nodeId.getPort()));
+ ugi.addToken(nodeToken);
+ proxy = ugi.doAs(new PrivilegedAction() {
+ @Override
+ public LlapDaemonProtocolBlockingPB run() {
+ return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
+ nodeId.getPort(), retryPolicy, socketFactory);
+ }
+ });
+ }
+
+ LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+ if (proxyOld != null) {
+ // TODO Shutdown the new proxy.
+ proxy = proxyOld;
+ }
+ }
+ return proxy;
+ }
+
+ private String getHostIdentifier(String hostname, int port) {
+ return hostname + ":" + port;
+ }
+}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 9d47940..5c370ee 100644
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -22,8 +22,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -79,6 +81,9 @@
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskCommunicator.class);
+ private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+ private static final boolean isDebugEnabed = LOG.isDebugEnabled();
+
private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
private final ConcurrentMap credentialMap;
@@ -88,11 +93,17 @@
private final SourceStateTracker sourceStateTracker;
private final Set nodesForQuery = new HashSet<>();
- private TaskCommunicator communicator;
+ private LlapDaemonProtocolClientProxy communicator;
private long deleteDelayOnDagComplete;
private final LlapTaskUmbilicalProtocol umbilical;
private final Token token;
+ // These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
+ // Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
+ private final ConcurrentMap knownNodeMap = new ConcurrentHashMap<>();
+ private final ConcurrentMap pingedNodeMap = new ConcurrentHashMap<>();
+
+
private volatile String currentDagName;
public LlapTaskCommunicator(
@@ -131,7 +142,7 @@ public void initialize() throws Exception {
super.initialize();
Configuration conf = getConf();
int numThreads = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS);
- this.communicator = new TaskCommunicator(numThreads, conf, token);
+ this.communicator = new LlapDaemonProtocolClientProxy(numThreads, conf, token);
this.deleteDelayOnDagComplete = HiveConf.getTimeVar(
conf, ConfVars.LLAP_FILE_CLEANUP_DELAY_SECONDS, TimeUnit.SECONDS);
LOG.info("Running LlapTaskCommunicator with "
@@ -235,6 +246,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
}
LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+ registerKnownNode(nodeId);
entityTracker.registerTaskAttempt(containerId, taskSpec.getTaskAttemptID(), host, port);
nodesForQuery.add(nodeId);
@@ -254,7 +266,7 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task
getContext()
.taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
communicator.sendSubmitWork(requestProto, host, port,
- new TaskCommunicator.ExecuteRequestCallback() {
+ new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(SubmitWorkResponseProto response) {
if (response.hasSubmissionState()) {
@@ -333,14 +345,14 @@ private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
LOG.info(
"DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd");
- LlapNodeId nodeId = entityTracker.getNodeIfForTaskAttempt(taskAttemptId);
+ LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
// NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
if (nodeId != null) {
TerminateFragmentRequestProto request =
TerminateFragmentRequestProto.newBuilder().setDagName(currentDagName)
.setFragmentIdentifierString(taskAttemptId.toString()).build();
communicator.sendTerminateFragment(request, nodeId.getHostname(), nodeId.getPort(),
- new TaskCommunicator.ExecuteRequestCallback() {
+ new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(TerminateFragmentResponseProto response) {
}
@@ -365,7 +377,7 @@ public void dagComplete(final String dagName) {
for (final LlapNodeId llapNodeId : nodesForQuery) {
LOG.info("Sending dagComplete message for {}, to {}", dagName, llapNodeId);
communicator.sendQueryComplete(request, llapNodeId.getHostname(), llapNodeId.getPort(),
- new TaskCommunicator.ExecuteRequestCallback() {
+ new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
}
@@ -391,7 +403,7 @@ public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
public void sendStateUpdate(final String host, final int port,
final SourceStateUpdatedRequestProto request) {
communicator.sendSourceStateUpdate(request, host, port,
- new TaskCommunicator.ExecuteRequestCallback() {
+ new LlapDaemonProtocolClientProxy.ExecuteRequestCallback() {
@Override
public void setResponse(SourceStateUpdatedResponseProto response) {
}
@@ -409,6 +421,79 @@ public void indicateError(Throwable t) {
}
+ private static class PingingNodeInfo {
+ final AtomicLong logTimestamp;
+ final AtomicInteger pingCount;
+
+ PingingNodeInfo(long currentTs) {
+ logTimestamp = new AtomicLong(currentTs);
+ pingCount = new AtomicInteger(1);
+ }
+ }
+
+ public void registerKnownNode(LlapNodeId nodeId) {
+ Long old = knownNodeMap.putIfAbsent(nodeId,
+ TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS));
+ if (old == null) {
+ if (isInfoEnabled) {
+ LOG.info("Added new known node: {}", nodeId);
+ }
+ }
+ }
+
+ public void registerPingingNode(LlapNodeId nodeId) {
+ long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ PingingNodeInfo ni = new PingingNodeInfo(currentTs);
+ PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
+ if (old == null) {
+ if (isInfoEnabled) {
+ LOG.info("Added new pinging node: [{}]", nodeId);
+ }
+ } else {
+ old.pingCount.incrementAndGet();
+ }
+ // The node should always be known by this point. Log occasionally if it is not known.
+ if (!knownNodeMap.containsKey(nodeId)) {
+ if (old == null) {
+ // First time this is seen. Log it.
+ LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, ni.pingCount.get());
+ } else {
+ // Pinged before. Log only occasionally.
+ if (currentTs > old.logTimestamp.get() + 5000l) { // 5 seconds elapsed. Log again.
+ LOG.warn("Received ping from unknownNode: [{}], count={}", nodeId, old.pingCount.get());
+ old.logTimestamp.set(currentTs);
+ }
+ }
+
+ }
+ }
+
+
+ private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
+
+ void nodePinged(String hostname, int port) {
+ LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
+ registerPingingNode(nodeId);
+ BiMap biMap =
+ entityTracker.getContainerAttemptMapForNode(nodeId);
+ if (biMap != null) {
+ synchronized (biMap) {
+ for (Map.Entry entry : biMap.entrySet()) {
+ getContext().taskAlive(entry.getValue());
+ getContext().containerAlive(entry.getKey());
+ }
+ }
+ } else {
+ long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
+ if (currentTs > nodeNotFoundLogTime.get() + 5000l) {
+ LOG.warn("Received ping from node without any registered tasks or containers: " + hostname +
+ ":" + port +
+ ". Could be caused by pre-emption by the AM," +
+ " or a mismatched hostname. Enable debug logging for mismatched host names");
+ nodeNotFoundLogTime.set(currentTs);
+ }
+ }
+ }
private void resetCurrentDag(String newDagName) {
// Working on the assumption that a single DAG runs at a time per AM.
@@ -454,6 +539,8 @@ private ByteBuffer serializeCredentials(Credentials credentials) throws IOExcept
return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
}
+
+
protected class LlapTaskUmbilicalProtocolImpl implements LlapTaskUmbilicalProtocol {
private final TezTaskUmbilicalProtocol tezUmbilical;
@@ -475,7 +562,7 @@ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOExce
@Override
public void nodeHeartbeat(Text hostname, int port) throws IOException {
- entityTracker.nodePinged(hostname.toString(), port);
+ nodePinged(hostname.toString(), port);
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
}
@@ -502,10 +589,17 @@ public ProtocolSignature getProtocolSignature(String protocol, long clientVersio
}
}
- private final class EntityTracker {
- private final ConcurrentMap attemptToNodeMap = new ConcurrentHashMap<>();
- private final ConcurrentMap containerToNodeMap = new ConcurrentHashMap<>();
- private final ConcurrentMap> nodeMap = new ConcurrentHashMap<>();
+ /**
+ * Track the association between known containers and taskAttempts, along with the nodes they are assigned to.
+ */
+ @VisibleForTesting
+ static final class EntityTracker {
+ @VisibleForTesting
+ final ConcurrentMap attemptToNodeMap = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ final ConcurrentMap containerToNodeMap = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ final ConcurrentMap> nodeMap = new ConcurrentHashMap<>();
void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId, String host, int port) {
if (LOG.isDebugEnabled()) {
@@ -513,6 +607,10 @@ void registerTaskAttempt(ContainerId containerId, TezTaskAttemptID taskAttemptId
}
LlapNodeId llapNodeId = LlapNodeId.getInstance(host, port);
attemptToNodeMap.putIfAbsent(taskAttemptId, llapNodeId);
+
+ registerContainer(containerId, host, port);
+
+ // nodeMap registration.
BiMap tmpMap = HashBiMap.create();
BiMap old = nodeMap.putIfAbsent(llapNodeId, tmpMap);
BiMap usedInstance;
@@ -538,10 +636,9 @@ void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
synchronized(bMap) {
matched = bMap.inverse().remove(attemptId);
}
- }
- // Removing here. Registration into the map has to make sure to put
- if (bMap.isEmpty()) {
- nodeMap.remove(llapNodeId);
+ if (bMap.isEmpty()) {
+ nodeMap.remove(llapNodeId);
+ }
}
// Remove the container mapping
@@ -552,23 +649,29 @@ void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
}
void registerContainer(ContainerId containerId, String hostname, int port) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registering " + containerId + " for node: " + hostname + ":" + port);
+ }
containerToNodeMap.putIfAbsent(containerId, LlapNodeId.getInstance(hostname, port));
+ // nodeMap registration is not required, since there's no taskId association.
}
LlapNodeId getNodeIdForContainer(ContainerId containerId) {
return containerToNodeMap.get(containerId);
}
- LlapNodeId getNodeIfForTaskAttempt(TezTaskAttemptID taskAttemptId) {
+ LlapNodeId getNodeIdForTaskAttempt(TezTaskAttemptID taskAttemptId) {
return attemptToNodeMap.get(taskAttemptId);
}
ContainerId getContainerIdForAttempt(TezTaskAttemptID taskAttemptId) {
- LlapNodeId llapNodeId = getNodeIfForTaskAttempt(taskAttemptId);
+ LlapNodeId llapNodeId = getNodeIdForTaskAttempt(taskAttemptId);
if (llapNodeId != null) {
BiMap bMap = nodeMap.get(llapNodeId).inverse();
if (bMap != null) {
- return bMap.get(taskAttemptId);
+ synchronized (bMap) {
+ return bMap.get(taskAttemptId);
+ }
} else {
return null;
}
@@ -582,7 +685,9 @@ TezTaskAttemptID getTaskAttemptIdForContainer(ContainerId containerId) {
if (llapNodeId != null) {
BiMap bMap = nodeMap.get(llapNodeId);
if (bMap != null) {
- return bMap.get(containerId);
+ synchronized (bMap) {
+ return bMap.get(containerId);
+ }
} else {
return null;
}
@@ -604,10 +709,9 @@ void unregisterContainer(ContainerId containerId) {
synchronized(bMap) {
matched = bMap.remove(containerId);
}
- }
- // Removing here. Registration into the map has to make sure to put
- if (bMap.isEmpty()) {
- nodeMap.remove(llapNodeId);
+ if (bMap.isEmpty()) {
+ nodeMap.remove(llapNodeId);
+ }
}
// Remove the container mapping
@@ -616,25 +720,20 @@ void unregisterContainer(ContainerId containerId) {
}
}
- private final AtomicLong nodeNotFoundLogTime = new AtomicLong(0);
- void nodePinged(String hostname, int port) {
- LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
- BiMap biMap = nodeMap.get(nodeId);
- if (biMap != null) {
- synchronized(biMap) {
- for (Map.Entry entry : biMap.entrySet()) {
- getContext().taskAlive(entry.getValue());
- getContext().containerAlive(entry.getKey());
- }
- }
- } else {
- if (System.currentTimeMillis() > nodeNotFoundLogTime.get() + 5000l) {
- LOG.warn("Received ping from unknown node: " + hostname + ":" + port +
- ". Could be caused by pre-emption by the AM," +
- " or a mismatched hostname. Enable debug logging for mismatched host names");
- nodeNotFoundLogTime.set(System.currentTimeMillis());
- }
- }
+ /**
+ * Return a {@link BiMap} containing container->taskAttemptId mapping for the host specified.
+ *
+ *
+ * This method return the internal structure used by the EntityTracker. Users must synchronize
+ * on the structure to ensure correct usage.
+ *
+ * @param llapNodeId
+ * @return
+ */
+ BiMap getContainerAttemptMapForNode(LlapNodeId llapNodeId) {
+ BiMap biMap = nodeMap.get(llapNodeId);
+ return biMap;
}
+
}
-}
\ No newline at end of file
+}
diff --git llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
deleted file mode 100644
index f9ca677..0000000
--- llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import javax.net.SocketFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
-import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
-import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.service.AbstractService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TaskCommunicator extends AbstractService {
-
- private static final Logger LOG = LoggerFactory.getLogger(TaskCommunicator.class);
-
- private final ConcurrentMap hostProxies;
-
- private final RequestManager requestManager;
- private final RetryPolicy retryPolicy;
- private final SocketFactory socketFactory;
-
- private final ListeningExecutorService requestManagerExecutor;
- private volatile ListenableFuture requestManagerFuture;
- private final Token llapToken;
-
- public TaskCommunicator(
- int numThreads, Configuration conf, Token llapToken) {
- super(TaskCommunicator.class.getSimpleName());
- this.hostProxies = new ConcurrentHashMap<>();
- this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
- this.llapToken = llapToken;
-
- long connectionTimeout = HiveConf.getTimeVar(conf,
- ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- long retrySleep = HiveConf.getTimeVar(conf,
- ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
- TimeUnit.MILLISECONDS);
- this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
- connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
-
- this.requestManager = new RequestManager(numThreads);
- ExecutorService localExecutor = Executors.newFixedThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
- this.requestManagerExecutor = MoreExecutors.listeningDecorator(localExecutor);
-
- LOG.info("Setting up taskCommunicator with" +
- "numThreads=" + numThreads +
- "retryTime(millis)=" + connectionTimeout +
- "retrySleep(millis)=" + retrySleep);
- }
-
- @Override
- public void serviceStart() {
- requestManagerFuture = requestManagerExecutor.submit(requestManager);
- Futures.addCallback(requestManagerFuture, new FutureCallback() {
- @Override
- public void onSuccess(Void result) {
- LOG.info("RequestManager shutdown");
- }
-
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("RequestManager shutdown with error", t);
- }
- });
- }
-
- @Override
- public void serviceStop() {
- if (requestManagerFuture != null) {
- requestManager.shutdown();
- requestManagerFuture.cancel(true);
- }
- requestManagerExecutor.shutdown();
- }
-
- public void sendSubmitWork(SubmitWorkRequestProto request, String host, int port,
- final ExecuteRequestCallback callback) {
- LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
- requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, callback));
- }
-
- public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto request, final String host,
- final int port,
- final ExecuteRequestCallback callback) {
- LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
- requestManager.queueRequest(
- new SendSourceStateUpdateCallable(nodeId, request, callback));
- }
-
- public void sendQueryComplete(final QueryCompleteRequestProto request, final String host,
- final int port,
- final ExecuteRequestCallback callback) {
- LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
- requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, callback));
- }
-
- public void sendTerminateFragment(final TerminateFragmentRequestProto request, final String host,
- final int port,
- final ExecuteRequestCallback callback) {
- LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
- requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, request, callback));
- }
-
- @VisibleForTesting
- static class RequestManager implements Callable {
-
- private final Lock lock = new ReentrantLock();
- private final AtomicBoolean isShutdown = new AtomicBoolean(false);
- private final Condition queueCondition = lock.newCondition();
- private final AtomicBoolean shouldRun = new AtomicBoolean(false);
-
- private final int maxConcurrentRequestsPerNode = 1;
- private final ListeningExecutorService executor;
-
-
- // Tracks new additions via add, while the loop is processing existing ones.
- private final LinkedList newRequestList = new LinkedList<>();
-
- // Tracks existing requests which are cycled through.
- private final LinkedList pendingRequests = new LinkedList<>();
-
- // Tracks requests executing per node
- private final ConcurrentMap runningRequests = new ConcurrentHashMap<>();
-
- // Tracks completed requests pre node
- private final LinkedList completedNodes = new LinkedList<>();
-
- public RequestManager(int numThreads) {
- ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build());
- executor = MoreExecutors.listeningDecorator(localExecutor);
- }
-
-
- @VisibleForTesting
- Set currentLoopDisabledNodes = new HashSet<>();
- @VisibleForTesting
- List currentLoopSkippedRequests = new LinkedList<>();
- @Override
- public Void call() {
- // Caches disabled nodes for quicker lookups and ensures a request on a node which was skipped
- // does not go out of order.
- while (!isShutdown.get()) {
- lock.lock();
- try {
- while (!shouldRun.get()) {
- queueCondition.await();
- break; // Break out and try executing.
- }
- boolean shouldBreak = process();
- if (shouldBreak) {
- break;
- }
- } catch (InterruptedException e) {
- if (isShutdown.get()) {
- break;
- } else {
- LOG.warn("RunLoop interrupted without being shutdown first");
- throw new RuntimeException(e);
- }
- } finally {
- lock.unlock();
- }
- }
- LOG.info("CallScheduler loop exiting");
- return null;
- }
-
- /* Add a new request to be executed */
- public void queueRequest(CallableRequest request) {
- synchronized (newRequestList) {
- newRequestList.add(request);
- shouldRun.set(true);
- }
- notifyRunLoop();
- }
-
- /* Indicates a request has completed on a node */
- public void requestFinished(LlapNodeId nodeId) {
- synchronized (completedNodes) {
- completedNodes.add(nodeId);
- shouldRun.set(true);
- }
- notifyRunLoop();
- }
-
- public void shutdown() {
- if (!isShutdown.getAndSet(true)) {
- executor.shutdownNow();
- notifyRunLoop();
- }
- }
-
- @VisibleForTesting
- void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
- ListenableFuture future =
- executor.submit(request);
- Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this));
- }
-
- @VisibleForTesting
- boolean process() {
- if (isShutdown.get()) {
- return true;
- }
- currentLoopDisabledNodes.clear();
- currentLoopSkippedRequests.clear();
-
- // Set to false to block the next loop. This must be called before draining the lists,
- // otherwise an add/completion after draining the lists but before setting it to false,
- // will not trigger a run. May cause one unnecessary run if an add comes in before drain.
- // drain list. add request (setTrue). setFalse needs to be avoided.
- shouldRun.compareAndSet(true, false);
- // Drain any calls which may have come in during the last execution of the loop.
- drainNewRequestList(); // Locks newRequestList
- drainCompletedNodes(); // Locks completedNodes
-
-
- Iterator iterator = pendingRequests.iterator();
- while (iterator.hasNext()) {
- CallableRequest request = iterator.next();
- iterator.remove();
- LlapNodeId nodeId = request.getNodeId();
- if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
- submitToExecutor(request, nodeId);
- } else {
- currentLoopDisabledNodes.add(nodeId);
- currentLoopSkippedRequests.add(request);
- }
- }
- // Tried scheduling everything that could be scheduled in this loop.
- pendingRequests.addAll(0, currentLoopSkippedRequests);
- return false;
- }
-
- private void drainNewRequestList() {
- synchronized (newRequestList) {
- if (!newRequestList.isEmpty()) {
- pendingRequests.addAll(newRequestList);
- newRequestList.clear();
- }
- }
- }
-
- private void drainCompletedNodes() {
- synchronized (completedNodes) {
- if (!completedNodes.isEmpty()) {
- for (LlapNodeId nodeId : completedNodes) {
- runningRequests.get(nodeId).decrementAndGet();
- }
- }
- completedNodes.clear();
- }
- }
-
- private boolean canRunForNode(LlapNodeId nodeId, Set currentRunDisabledNodes) {
- if (currentRunDisabledNodes.contains(nodeId)) {
- return false;
- } else {
- AtomicInteger count = runningRequests.get(nodeId);
- if (count == null) {
- count = new AtomicInteger(0);
- AtomicInteger old = runningRequests.putIfAbsent(nodeId, count);
- count = old != null ? old : count;
- }
- if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) {
- return true;
- } else {
- count.decrementAndGet();
- return false;
- }
- }
- }
-
- private void notifyRunLoop() {
- lock.lock();
- try {
- queueCondition.signal();
- } finally {
- lock.unlock();
- }
- }
- }
-
-
- private static final class ResponseCallback
- implements FutureCallback {
-
- private final ExecuteRequestCallback callback;
- private final LlapNodeId nodeId;
- private final RequestManager requestManager;
-
- public ResponseCallback(ExecuteRequestCallback callback, LlapNodeId nodeId,
- RequestManager requestManager) {
- this.callback = callback;
- this.nodeId = nodeId;
- this.requestManager = requestManager;
- }
-
- @Override
- public void onSuccess(TYPE result) {
- try {
- callback.setResponse(result);
- } finally {
- requestManager.requestFinished(nodeId);
- }
- }
-
- @Override
- public void onFailure(Throwable t) {
- try {
- callback.indicateError(t);
- } finally {
- requestManager.requestFinished(nodeId);
- }
- }
- }
-
- @VisibleForTesting
- static abstract class CallableRequest
- implements Callable {
-
- final LlapNodeId nodeId;
- final ExecuteRequestCallback callback;
- final REQUEST request;
-
-
- protected CallableRequest(LlapNodeId nodeId, REQUEST request, ExecuteRequestCallback callback) {
- this.nodeId = nodeId;
- this.request = request;
- this.callback = callback;
- }
-
- public LlapNodeId getNodeId() {
- return nodeId;
- }
-
- public ExecuteRequestCallback getCallback() {
- return callback;
- }
-
- public abstract RESPONSE call() throws Exception;
- }
-
- private class SubmitWorkCallable extends CallableRequest {
-
- protected SubmitWorkCallable(LlapNodeId nodeId,
- SubmitWorkRequestProto submitWorkRequestProto,
- ExecuteRequestCallback callback) {
- super(nodeId, submitWorkRequestProto, callback);
- }
-
- @Override
- public SubmitWorkResponseProto call() throws Exception {
- return getProxy(nodeId).submitWork(null, request);
- }
- }
-
- private class SendSourceStateUpdateCallable
- extends CallableRequest {
-
- public SendSourceStateUpdateCallable(LlapNodeId nodeId,
- SourceStateUpdatedRequestProto request,
- ExecuteRequestCallback callback) {
- super(nodeId, request, callback);
- }
-
- @Override
- public SourceStateUpdatedResponseProto call() throws Exception {
- return getProxy(nodeId).sourceStateUpdated(null, request);
- }
- }
-
- private class SendQueryCompleteCallable
- extends CallableRequest {
-
- protected SendQueryCompleteCallable(LlapNodeId nodeId,
- QueryCompleteRequestProto queryCompleteRequestProto,
- ExecuteRequestCallback callback) {
- super(nodeId, queryCompleteRequestProto, callback);
- }
-
- @Override
- public QueryCompleteResponseProto call() throws Exception {
- return getProxy(nodeId).queryComplete(null, request);
- }
- }
-
- private class SendTerminateFragmentCallable
- extends CallableRequest {
-
- protected SendTerminateFragmentCallable(LlapNodeId nodeId,
- TerminateFragmentRequestProto terminateFragmentRequestProto,
- ExecuteRequestCallback callback) {
- super(nodeId, terminateFragmentRequestProto, callback);
- }
-
- @Override
- public TerminateFragmentResponseProto call() throws Exception {
- return getProxy(nodeId).terminateFragment(null, request);
- }
- }
-
- public interface ExecuteRequestCallback {
- void setResponse(T response);
- void indicateError(Throwable t);
- }
-
- private LlapDaemonProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
- String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
-
- LlapDaemonProtocolBlockingPB proxy = hostProxies.get(hostId);
- if (proxy == null) {
- if (llapToken == null) {
- proxy = new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
- nodeId.getPort(), retryPolicy, socketFactory);
- } else {
- UserGroupInformation ugi;
- try {
- ugi = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- Token nodeToken = new Token(llapToken);
- SecurityUtil.setTokenService(nodeToken, NetUtils.createSocketAddrForHost(
- nodeId.getHostname(), nodeId.getPort()));
- ugi.addToken(nodeToken);
- proxy = ugi.doAs(new PrivilegedAction() {
- @Override
- public LlapDaemonProtocolBlockingPB run() {
- return new LlapDaemonProtocolClientImpl(getConfig(), nodeId.getHostname(),
- nodeId.getPort(), retryPolicy, socketFactory);
- }
- });
- }
-
- LlapDaemonProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
- if (proxyOld != null) {
- // TODO Shutdown the new proxy.
- proxy = proxyOld;
- }
- }
- return proxy;
- }
-
- private String getHostIdentifier(String hostname, int port) {
- return hostname + ":" + port;
- }
-}
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
index e69de29..a6af8c2 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
+++ llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapDaemonProtocolClientProxy.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.protobuf.Message;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.junit.Test;
+
+public class TestLlapDaemonProtocolClientProxy {
+
+ @Test (timeout = 5000)
+ public void testMultipleNodes() {
+ RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+ LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+ LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
+
+ Message mockMessage = mock(Message.class);
+ LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
+ LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+
+ // Request two messages
+ requestManager.queueRequest(
+ new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
+ requestManager.queueRequest(
+ new CallableRequestForTest(nodeId2, mockMessage, mockExecuteRequestCallback));
+
+ // Should go through in a single process call
+ requestManager.process();
+ assertEquals(2, requestManager.numSubmissionsCounters);
+ assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+ assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
+ assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
+ assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+ assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+ assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+ }
+
+ @Test(timeout = 5000)
+ public void testSingleInvocationPerNode() {
+ RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+ LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+
+ Message mockMessage = mock(Message.class);
+ LlapDaemonProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback = mock(
+ LlapDaemonProtocolClientProxy.ExecuteRequestCallback.class);
+
+ // First request for host.
+ requestManager.queueRequest(
+ new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
+ requestManager.process();
+ assertEquals(1, requestManager.numSubmissionsCounters);
+ assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+ assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+
+ // Second request for host. Single invocation since the last has not completed.
+ requestManager.queueRequest(
+ new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
+ requestManager.process();
+ assertEquals(1, requestManager.numSubmissionsCounters);
+ assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+ assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ assertEquals(1, requestManager.currentLoopSkippedRequests.size());
+ assertEquals(1, requestManager.currentLoopDisabledNodes.size());
+ assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+
+ // Complete first request. Second pending request should go through.
+ requestManager.requestFinished(nodeId1);
+ requestManager.process();
+ assertEquals(2, requestManager.numSubmissionsCounters);
+ assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+ assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+ assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+ assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+ assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+ }
+
+
+ static class RequestManagerForTest extends LlapDaemonProtocolClientProxy.RequestManager {
+
+ int numSubmissionsCounters = 0;
+ private Map numInvocationsPerNode = new HashMap<>();
+
+ public RequestManagerForTest(int numThreads) {
+ super(numThreads);
+ }
+
+ protected void submitToExecutor(LlapDaemonProtocolClientProxy.CallableRequest request, LlapNodeId nodeId) {
+ numSubmissionsCounters++;
+ MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
+ if (nodeCount == null) {
+ nodeCount = new MutableInt(0);
+ numInvocationsPerNode.put(nodeId, nodeCount);
+ }
+ nodeCount.increment();
+ }
+
+ void reset() {
+ numSubmissionsCounters = 0;
+ numInvocationsPerNode.clear();
+ }
+
+ }
+
+ static class CallableRequestForTest extends LlapDaemonProtocolClientProxy.CallableRequest {
+
+ protected CallableRequestForTest(LlapNodeId nodeId, Message message,
+ LlapDaemonProtocolClientProxy.ExecuteRequestCallback callback) {
+ super(nodeId, message, callback);
+ }
+
+ @Override
+ public Message call() throws Exception {
+ return null;
+ }
+ }
+
+}
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index e69de29..8f3d104 100644
--- llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.junit.Test;
+
+public class TestLlapTaskCommunicator {
+
+ @Test (timeout = 5000)
+ public void testEntityTracker1() {
+ LlapTaskCommunicator.EntityTracker entityTracker = new LlapTaskCommunicator.EntityTracker();
+
+ String host1 = "host1";
+ String host2 = "host2";
+ String host3 = "host3";
+ int port = 1451;
+
+
+ // Simple container registration and un-registration without any task attempt being involved.
+ ContainerId containerId101 = constructContainerId(101);
+ entityTracker.registerContainer(containerId101, host1, port);
+ assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId101));
+
+ entityTracker.unregisterContainer(containerId101);
+ assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port)));
+ assertNull(entityTracker.getNodeIdForContainer(containerId101));
+ assertEquals(0, entityTracker.nodeMap.size());
+ assertEquals(0, entityTracker.attemptToNodeMap.size());
+ assertEquals(0, entityTracker.containerToNodeMap.size());
+
+
+ // Simple task registration and un-registration.
+ ContainerId containerId1 = constructContainerId(1);
+ TezTaskAttemptID taskAttemptId1 = constructTaskAttemptId(1);
+ entityTracker.registerTaskAttempt(containerId1, taskAttemptId1, host1, port);
+ assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId1));
+ assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+
+ entityTracker.unregisterTaskAttempt(taskAttemptId1);
+ assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port)));
+ assertNull(entityTracker.getNodeIdForContainer(containerId1));
+ assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId1));
+ assertEquals(0, entityTracker.nodeMap.size());
+ assertEquals(0, entityTracker.attemptToNodeMap.size());
+ assertEquals(0, entityTracker.containerToNodeMap.size());
+
+ // Register taskAttempt, unregister container. TaskAttempt should also be unregistered
+ ContainerId containerId201 = constructContainerId(201);
+ TezTaskAttemptID taskAttemptId201 = constructTaskAttemptId(201);
+ entityTracker.registerTaskAttempt(containerId201, taskAttemptId201, host1, port);
+ assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForContainer(containerId201));
+ assertEquals(LlapNodeId.getInstance(host1, port), entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+
+ entityTracker.unregisterContainer(containerId201);
+ assertNull(entityTracker.getContainerAttemptMapForNode(LlapNodeId.getInstance(host1, port)));
+ assertNull(entityTracker.getNodeIdForContainer(containerId201));
+ assertNull(entityTracker.getNodeIdForTaskAttempt(taskAttemptId201));
+ assertEquals(0, entityTracker.nodeMap.size());
+ assertEquals(0, entityTracker.attemptToNodeMap.size());
+ assertEquals(0, entityTracker.containerToNodeMap.size());
+
+ entityTracker.unregisterTaskAttempt(taskAttemptId201); // No errors
+ }
+
+
+ private ContainerId constructContainerId(int id) {
+ ContainerId containerId = mock(ContainerId.class);
+ doReturn(id).when(containerId).getId();
+ doReturn((long)id).when(containerId).getContainerId();
+ return containerId;
+ }
+
+ private TezTaskAttemptID constructTaskAttemptId(int id) {
+ TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
+ doReturn(id).when(taskAttemptId).getId();
+ return taskAttemptId;
+ }
+
+}
diff --git llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
deleted file mode 100644
index 2aef4ed..0000000
--- llap-server/src/test/org/apache/hadoop/hive/llap/tezplugins/TestTaskCommunicator.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.tezplugins;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.protobuf.Message;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.hive.llap.LlapNodeId;
-import org.junit.Test;
-
-public class TestTaskCommunicator {
-
- @Test (timeout = 5000)
- public void testMultipleNodes() {
- RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
- LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
- LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
-
- Message mockMessage = mock(Message.class);
- TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock(
- TaskCommunicator.ExecuteRequestCallback.class);
-
- // Request two messages
- requestManager.queueRequest(
- new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
- requestManager.queueRequest(
- new CallableRequestForTest(nodeId2, mockMessage, mockExecuteRequestCallback));
-
- // Should go through in a single process call
- requestManager.process();
- assertEquals(2, requestManager.numSubmissionsCounters);
- assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
- assertEquals(0, requestManager.currentLoopSkippedRequests.size());
- assertEquals(0, requestManager.currentLoopSkippedRequests.size());
- assertEquals(0, requestManager.currentLoopDisabledNodes.size());
- }
-
- @Test(timeout = 5000)
- public void testSingleInvocationPerNode() {
- RequestManagerForTest requestManager = new RequestManagerForTest(1);
-
- LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
-
- Message mockMessage = mock(Message.class);
- TaskCommunicator.ExecuteRequestCallback mockExecuteRequestCallback = mock(
- TaskCommunicator.ExecuteRequestCallback.class);
-
- // First request for host.
- requestManager.queueRequest(
- new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
- requestManager.process();
- assertEquals(1, requestManager.numSubmissionsCounters);
- assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
- assertEquals(0, requestManager.currentLoopSkippedRequests.size());
-
- // Second request for host. Single invocation since the last has not completed.
- requestManager.queueRequest(
- new CallableRequestForTest(nodeId1, mockMessage, mockExecuteRequestCallback));
- requestManager.process();
- assertEquals(1, requestManager.numSubmissionsCounters);
- assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(1, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
- assertEquals(1, requestManager.currentLoopSkippedRequests.size());
- assertEquals(1, requestManager.currentLoopDisabledNodes.size());
- assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
-
- // Complete first request. Second pending request should go through.
- requestManager.requestFinished(nodeId1);
- requestManager.process();
- assertEquals(2, requestManager.numSubmissionsCounters);
- assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
- assertEquals(2, requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
- assertEquals(0, requestManager.currentLoopSkippedRequests.size());
- assertEquals(0, requestManager.currentLoopDisabledNodes.size());
- assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
- }
-
-
- static class RequestManagerForTest extends TaskCommunicator.RequestManager {
-
- int numSubmissionsCounters = 0;
- private Map numInvocationsPerNode = new HashMap<>();
-
- public RequestManagerForTest(int numThreads) {
- super(numThreads);
- }
-
- protected void submitToExecutor(TaskCommunicator.CallableRequest request, LlapNodeId nodeId) {
- numSubmissionsCounters++;
- MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
- if (nodeCount == null) {
- nodeCount = new MutableInt(0);
- numInvocationsPerNode.put(nodeId, nodeCount);
- }
- nodeCount.increment();
- }
-
- void reset() {
- numSubmissionsCounters = 0;
- numInvocationsPerNode.clear();
- }
-
- }
-
- static class CallableRequestForTest extends TaskCommunicator.CallableRequest {
-
- protected CallableRequestForTest(LlapNodeId nodeId, Message message,
- TaskCommunicator.ExecuteRequestCallback callback) {
- super(nodeId, message, callback);
- }
-
- @Override
- public Message call() throws Exception {
- return null;
- }
- }
-
-}