diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 799367b..c385d07 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -71,6 +71,7 @@ import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; @@ -114,7 +115,7 @@ public LlapTaskCommunicator( TaskCommunicatorContext taskCommunicatorContext) { super(taskCommunicatorContext); - Credentials credentials = taskCommunicatorContext.getCredentials(); + Credentials credentials = taskCommunicatorContext.getAMCredentials(); if (credentials != null) { @SuppressWarnings("unchecked") Token llapToken = @@ -313,7 +314,7 @@ public void indicateError(Throwable t) { "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.toString()); } else { // Exception from the RPC layer - communication failure, consider as KILLED / service down. @@ -329,7 +330,7 @@ public void indicateError(Throwable t) { "Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t); getContext() - .taskFailed(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.OTHER, + .taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.getMessage()); } } @@ -534,7 +535,7 @@ private void resetCurrentDag(int newDagId) { currentQueryIdentifierProto = constructQueryIdentifierProto(newDagId); sourceStateTracker.resetState(newDagId); nodesForQuery.clear(); - LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagName()); + LOG.info("CurrentDagId set to: " + newDagId + ", name=" + getContext().getCurrentDagInfo().getName()); // TODO Is it possible for heartbeats to come in from lost tasks - those should be told to die, which // is likely already happening. } @@ -550,13 +551,13 @@ private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerI builder.setAmPort(getAddress().getPort()); Credentials taskCredentials = new Credentials(); // Credentials can change across DAGs. Ideally construct only once per DAG. - taskCredentials.addAll(getContext().getCredentials()); + taskCredentials.addAll(getContext().getAMCredentials()); Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() == taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId()); ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto); if (credentialsBinary == null) { - credentialsBinary = serializeCredentials(getContext().getCredentials()); + credentialsBinary = serializeCredentials(getContext().getAMCredentials()); credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate()); } else { credentialsBinary = credentialsBinary.duplicate(); diff --git llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java index 1ee6a50..29fc426 100644 --- llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java +++ llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java @@ -278,7 +278,7 @@ private void signalOpDoneIfBothInvoked() { public LlapTaskCommunicatorWrapperForTest(LlapProtocolClientProxy llapProxy) throws Exception { doReturn(appAttemptId).when(taskCommunicatorContext).getApplicationAttemptId(); - doReturn(new Credentials()).when(taskCommunicatorContext).getCredentials(); + doReturn(new Credentials()).when(taskCommunicatorContext).getAMCredentials(); doReturn(userPayload).when(taskCommunicatorContext).getInitialUserPayload(); doReturn(appId.toString()).when(taskCommunicatorContext).getCurrentAppIdentifier(); doReturn(new LinkedList()).when(taskCommunicatorContext)