diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 3c7c5d2..893b7d9 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -321,8 +321,22 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task nodesForQuery.add(nodeId); sourceStateTracker.registerTaskForStateUpdates(host, port, taskSpec.getInputs()); - FragmentRuntimeInfo fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( - taskSpec.getVertexName(), taskSpec.getTaskAttemptID().getTaskID().getId(), priority); + FragmentRuntimeInfo fragmentRuntimeInfo; + try { + fragmentRuntimeInfo = sourceStateTracker.getFragmentRuntimeInfo( + taskSpec.getVertexName(), + taskSpec.getTaskAttemptID().getTaskID().getId(), priority); + } catch (Exception e) { + LOG.error( + "Error while trying to get runtimeFragmentInfo for fragmentId={}, containerId={}, currentQI={}, currentQueryId={}", + taskSpec.getTaskAttemptID(), containerId, currentQueryIdentifierProto, + currentHiveQueryId, e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } SubmitWorkRequestProto requestProto; try { diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index dc594a2..97191f8 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -529,6 +529,22 @@ public void dagComplete() { try { dagRunning = false; dagStats = new StatsPerDag(); + int pendingCount = 0; + for (Entry> entry : pendingTasks.entrySet()) { + if (entry.getValue() != null) { + pendingCount += entry.getValue().size(); + } + } + int runningCount = 0; + for (Entry> entry : runningTasks.entrySet()) { + if (entry.getValue() != null) { + runningCount += entry.getValue().size(); + } + } + + LOG.info( + "DAG reset. Current knownTaskCount={}, pendingTaskCount={}, runningTaskCount={}", + knownTasks.size(), pendingCount, runningCount); } finally { writeLock.unlock(); } diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java index 2a66e4d..9589a3a 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/SourceStateTracker.java @@ -120,8 +120,8 @@ public synchronized void sourceStateUpdated(String sourceName, VertexState sourc SourceInfo sourceInfo = getSourceInfo(sourceName); // Update source info if the state is SUCCEEDED if (sourceState == VertexState.SUCCEEDED) { - sourceInfo.numCompletedTasks = taskCommunicatorContext.getVertexCompletedTaskCount(sourceName); - sourceInfo.numTasks = taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + sourceInfo.numCompletedTasks = getVertexCompletedTaskCount(sourceName); + sourceInfo.numTasks = getVertexTotalTaskCount(sourceName); } sourceInfo.lastKnownState = sourceState; // Checking state per node for future failure handling scenarios, where an update @@ -172,8 +172,9 @@ private void computeUpstreamTaskCounts(MutableInt completedTaskCount, MutableInt completedTaskCount.add(sourceInfo.numCompletedTasks); totalTaskCount.add(sourceInfo.numTasks); } else { - completedTaskCount.add(taskCommunicatorContext.getVertexCompletedTaskCount(sourceName)); - int totalCount =taskCommunicatorContext.getVertexTotalTaskCount(sourceName); + completedTaskCount.add(getVertexCompletedTaskCount(sourceName)); + int totalCount = getVertexTotalTaskCount(sourceName); + // Uninitialized vertices will report count as 0. totalCount = totalCount == -1 ? 0 : totalCount; totalTaskCount.add(totalCount); @@ -272,6 +273,41 @@ private void maybeRegisterForVertexUpdates(String sourceName) { } } + private int getVertexCompletedTaskCount(String vname) { + int completedTaskCount; + try { + completedTaskCount = + taskCommunicatorContext.getVertexCompletedTaskCount(vname); + return completedTaskCount; + } catch (Exception e) { + LOG.error("Failed to get vertex completed task count for sourceName={}", + vname); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } + + private int getVertexTotalTaskCount(String vname) { + int totalCount; + try { + totalCount = + taskCommunicatorContext.getVertexTotalTaskCount(vname); + return totalCount; + } catch (Exception e) { + LOG.error("Failed to get total task count for sourceName={}", vname); + if (e instanceof RuntimeException) { + throw (RuntimeException)e; + } else { + throw new RuntimeException(e); + } + } + } + + + void sendStateUpdateToNode(LlapNodeId nodeId, String sourceName, VertexState state) {