diff --git llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index b168f7648e..36a2d6bf49 100644 --- llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -94,12 +94,14 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.event.VertexStateUpdate; import org.apache.tez.dag.app.TezTaskCommunicatorImpl; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; import org.apache.tez.serviceplugins.api.ContainerEndReason; +import org.apache.tez.serviceplugins.api.DagInfo; import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults; import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; import org.apache.tez.serviceplugins.api.TaskCommunicatorContext; @@ -398,11 +400,9 @@ public void registerRunningTaskAttempt(final ContainerId containerId, final Task credentialsChanged, priority); int dagId = taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId(); if (currentQueryIdentifierProto == null || (dagId != currentQueryIdentifierProto.getDagIndex())) { - // TODO HiveQueryId extraction by parsing the Processor payload is ugly. This can be improved - // once TEZ-2672 is fixed. - String hiveQueryId; + String hiveQueryId = extractQueryIdFromContext(); try { - hiveQueryId = extractQueryId(taskSpec); + hiveQueryId = (hiveQueryId == null) ? extractQueryId(taskSpec) : hiveQueryId; } catch (IOException e) { throw new RuntimeException("Failed to extract query id from task spec: " + taskSpec, e); } @@ -820,12 +820,22 @@ private void resetCurrentDag(int newDagId, String hiveQueryId) { // is likely already happening. } + // Needed for GenericUDTFGetSplits, where TaskSpecs are generated private String extractQueryId(TaskSpec taskSpec) throws IOException { UserPayload processorPayload = taskSpec.getProcessorDescriptor().getUserPayload(); Configuration conf = TezUtils.createConfFromUserPayload(processorPayload); return HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); } + private String extractQueryIdFromContext() { + //TODO: Remove following instance of check, When TEZ-2672 exposes getConf from DagInfo + DagInfo dagInfo = getContext().getCurrentDagInfo(); + if (dagInfo instanceof DAG) { + return ((DAG)dagInfo).getConf().get(ConfVars.HIVEQUERYID.varname); + } + return null; + } + private SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec, FragmentRuntimeInfo fragmentRuntimeInfo, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 854bc89e9c..b1bf2f8903 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -424,6 +424,9 @@ DAG build(JobConf conf, TezWork tezWork, Path scratchDir, Context ctx, .put("description", ctx.getCmd()); String dagInfo = json.toString(); + String queryId = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYID); + dag.setConf(HiveConf.ConfVars.HIVEQUERYID.varname, queryId); + if (LOG.isDebugEnabled()) { LOG.debug("DagInfo: " + dagInfo); }