diff --git llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java index ad39963614..0cb1c090e2 100644 --- llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java +++ llap-common/src/java/org/apache/hadoop/hive/llap/AsyncPbRpcProxy.java @@ -171,7 +171,7 @@ public void shutdown() { CallableRequest request, LlapNodeId nodeId) { ListenableFuture future = executor.submit(request); Futures.addCallback(future, new ResponseCallback( - request.getCallback(), nodeId, this)); + request.getCallback(), nodeId, this), executor); } @VisibleForTesting @@ -283,7 +283,7 @@ public void onFailure(Throwable t) { LOG.warn("RequestManager shutdown with error", t); } } - }); + }, requestManagerExecutor); } @Override diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 088a5f33c0..c4732088fc 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -174,7 +174,7 @@ public void onFailure(Throwable t) { Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t); } } - }); + }, queueLookupExecutor); // TODO: why is this needed? we could just save the host and port? nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort()); LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId); @@ -274,7 +274,7 @@ public void onFailure(Throwable t) { LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.", taskAttemptId); } - }); + }, executor); } public void queryComplete(QueryIdentifier queryIdentifier) { @@ -342,7 +342,7 @@ public void onFailure(Throwable t) { amNodeInfo.amNodeId, currentQueryIdentifier, t); queryFailedHandler.queryFailed(currentQueryIdentifier); } - }); + }, executor); } } } catch (InterruptedException e) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java index 33ade55ee1..4050cf9037 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java @@ -128,7 +128,7 @@ public synchronized void registerTask(RuntimeTask task, sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId, wmCounters); ListenableFuture future = heartbeatExecutor.submit(currentCallable); - Futures.addCallback(future, new HeartbeatCallback(errorReporter)); + Futures.addCallback(future, new HeartbeatCallback(errorReporter), heartbeatExecutor); } /** diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index aaf9674621..84d242f364 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -175,7 +175,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize, executionCompletionExecutorService = MoreExecutors.listeningDecorator( executionCompletionExecutorServiceRaw); ListenableFuture future = waitQueueExecutorService.submit(new WaitQueueWorker()); - Futures.addCallback(future, new WaitQueueWorkerCallback()); + Futures.addCallback(future, new WaitQueueWorkerCallback(), waitQueueExecutorService); } private LlapQueueComparatorBase createComparator( diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cdf767f1db..b1f41b589a 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -791,15 +791,17 @@ public void run() { }, 0, 10000L, TimeUnit.MILLISECONDS); nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable); - Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG)); + Futures.addCallback(nodeEnablerFuture, + new LoggingFutureCallback("NodeEnablerThread", LOG), nodeEnabledExecutor); delayedTaskSchedulerFuture = delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable); Futures.addCallback(delayedTaskSchedulerFuture, - new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG)); + new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG), delayedTaskSchedulerExecutor); schedulerFuture = schedulerExecutor.submit(schedulerCallable); - Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG)); + Futures.addCallback(schedulerFuture, + new LoggingFutureCallback("SchedulerThread", LOG), schedulerExecutor); registry.start(); registry.registerStateChangeListener(new NodeStateChangeListener()); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java index f8fa0cd1dd..b1ab34750d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java @@ -1099,7 +1099,7 @@ private static int transferSessionsToDestroy(Collection source, } private void failOnFutureFailure(ListenableFuture future) { - Futures.addCallback(future, FATAL_ERROR_CALLBACK); + Futures.addCallback(future, FATAL_ERROR_CALLBACK, workPool); } private void queueGetRequestOnMasterThread( @@ -1933,7 +1933,7 @@ public SessionInitContext(SettableFuture future, public void start() throws Exception { ListenableFuture getFuture = tezAmPool.getSessionAsync(); - Futures.addCallback(getFuture, this); + Futures.addCallback(getFuture, this, workPool); } @Override @@ -1987,7 +1987,7 @@ public void onSuccess(WmTezSession session) { case GETTING: { ListenableFuture waitFuture = session.waitForAmRegistryAsync( amRegistryTimeoutMs, timeoutPool); - Futures.addCallback(waitFuture, this); + Futures.addCallback(waitFuture, this, workPool); break; } case WAITING_FOR_REGISTRY: { diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index f5ab981f26..d7402380df 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -128,7 +128,7 @@ public void onSuccess(Boolean result) { public void onFailure(Throwable t) { future.setException(t); } - }); + }, timeoutPool); return future; }