diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index bc9a209..a4f5d4d 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -71,6 +71,7 @@ import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; @@ -114,7 +115,7 @@ public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { super(taskCommunicatorContext); - Credentials credentials = taskCommunicatorContext.getCredentials(); + Credentials credentials = taskCommunicatorContext.getAMCredentials(); if (credentials != null) { @SuppressWarnings("unchecked") Token llapToken = @@ -313,7 +314,7 @@ public void indicateError(Throwable t) { "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.toString()); } else { // Exception from the RPC layer - communication failure, consider as KILLED / service down. @@ -329,7 +330,7 @@ public void indicateError(Throwable t) { "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.getMessage()); } } @@ -534,7 +535,7 @@ private void resetCurrentDag(int newDagId) { currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); sourceStateTracker.resetState(newDagId); nodesForQuery.clear(); - LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName()); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagInfo().getName()); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java index 1ee6a50..8e2d0ac 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -55,6 +55,7 @@ import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; +import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; import org.junit.Test; @@ -278,9 +279,13 @@ private void signalOpDoneIfBothInvoked() { public LlapTaskCommunicatorWrapperForTest(LlapProtocolClientProxy llapProxy) throws Exception { doReturn(appAttemptId).when(taskCommunicatorContext).getApplicationAttemptId(); - doReturn(new Credentials()).when(taskCommunicatorContext).getCredentials(); + doReturn(new Credentials()).when(taskCommunicatorContext).getAMCredentials(); doReturn(userPayload).when(taskCommunicatorContext).getInitialUserPayload(); doReturn(appId.toString()).when(taskCommunicatorContext).getCurrentAppIdentifier(); + DagInfo dagInfo = mock(DagInfo.class); + doReturn(dagInfo).when(taskCommunicatorContext).getCurrentDagInfo(); + doReturn(DAG_NAME).when(dagInfo).getName(); + doReturn(new Credentials()).when(dagInfo).getCredentials(); doReturn(new LinkedList()).when(taskCommunicatorContext) .getInputVertexNames(any(String.class));