diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 2bdfee5..97a8b78 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; @@ -55,6 +57,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; +import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezTaskRunner; @@ -150,7 +153,7 @@ Map serviceConsumerMetadata = new HashMap<>(); serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.convertJobTokenToBytes(jobToken)); - Multimap startedInputsMap = HashMultimap.create(); + Multimap startedInputsMap = createStartedInputMap(request.getFragmentSpec()); UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(request.getTokenIdentifier()); @@ -187,7 +190,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception { if (shouldDie) { LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, + TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null, "Asked to die by the AM"); } } catch (IOException e) { @@ -252,6 +255,20 @@ private boolean isSourceOfInterest(InputSpec inputSpec) { return !inputClassName.equals(MRInputLegacy.class.getName()); } + private Multimap createStartedInputMap(FragmentSpecProto fragmentSpec) { + Multimap startedInputMap = HashMultimap.create(); + // Let the Processor control start for Broadcast inputs. + + // TODO For now, this affects non broadcast unsorted cases as well. Make use of the edge + // property when it's available. + for (IOSpecProto inputSpec : fragmentSpec.getInputSpecsList()) { + if (inputSpec.getIoDescriptor().getClassName().equals(UnorderedKVInput.class.getName())) { + startedInputMap.put(fragmentSpec.getVertexName(), inputSpec.getConnectedVertexName()); + } + } + return startedInputMap; + } + public void shutdown() { if (executor != null) { executor.shutdownNow();