diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6d7d4de..8c33fa2 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; import org.apache.hadoop.hive.llap.security.LlapSignerImpl; -import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; import org.apache.hadoop.io.DataInputBuffer; @@ -183,14 +182,16 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws SignableVertexSpec vertex = extractVertexSpec(request, tokenInfo); TezEvent initialEvent = extractInitialEvent(request, tokenInfo); - if (LOG.isInfoEnabled()) { - LOG.info("Queueing container for execution: " + stringifySubmitRequest(request, vertex)); - } - QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); TezTaskAttemptID attemptId = Converters.createTaskAttemptId(vertex.getQueryIdentifier(), vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber()); String fragmentIdString = attemptId.toString(); + if (LOG.isInfoEnabled()) { + LOG.info("Queueing container for execution: fragemendId={}, {}", + fragmentIdString, stringifySubmitRequest(request, vertex)); + } + QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); + HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(), localAddress.get().getHostName(), vertex.getDagName(), qIdProto.getDagIndex(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); @@ -478,11 +479,7 @@ public void fragmentComplete(QueryFragmentInfo fragmentInfo) { public void queryFailed(QueryIdentifier queryIdentifier) { LOG.info("Processing query failed notification for {}", queryIdentifier); List knownFragments; - try { - knownFragments = queryTracker.queryComplete(queryIdentifier, -1, true); - } catch (IOException e) { - throw new RuntimeException(e); // Should never happen here, no permission check. - } + knownFragments = queryTracker.getRegisteredFragments(queryIdentifier); LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier, knownFragments.size()); for (QueryFragmentInfo fragmentInfo : knownFragments) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index a7d7981..9eaced6 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.llap.log.LogHelpers; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; import org.apache.log4j.MDC; import org.apache.logging.slf4j.Log4jMarker; import org.apache.tez.common.CallableWithNdc; @@ -146,9 +147,10 @@ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appId if (completedDagMap.contains(queryIdentifier)) { // Cleanup the dag lock here, since it may have been created after the query completed dagSpecificLocks.remove(queryIdentifier); - throw new RuntimeException( - "Dag " + dagName + " already complete. Rejecting fragment [" - + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]"); + String message = "Dag " + dagName + " already complete. Rejecting fragment [" + + vertexName + ", " + fragmentNumber + ", " + attemptNumber + "]"; + LOG.info(message); + throw new RuntimeException(message); } // TODO: for now, we get the secure username out of UGI... after signing, we can take it // out of the request provided that it's signed. @@ -211,6 +213,22 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { } } + List getRegisteredFragments(QueryIdentifier queryIdentifier) { + ReadWriteLock dagLock = getDagLock(queryIdentifier); + dagLock.writeLock().lock(); + try { + QueryInfo queryInfo = queryInfoMap.get(queryIdentifier); + if (queryInfo == null) { + // Race with queryComplete + LOG.warn("Unknown query: Returning an empty list of fragments"); + return Collections.emptyList(); + } + return queryInfo.getRegisteredFragments(); + } finally { + dagLock.writeLock().unlock(); + } + } + /** * Register completion for a query * @param queryIdentifier @@ -231,8 +249,7 @@ void fragmentComplete(QueryFragmentInfo fragmentInfo) { deleteDelay); queryInfoMap.remove(queryIdentifier); if (queryInfo == null) { - // One case where this happens is when a query is killed via an explicit signal, and then - // another message is received from teh AMHeartbeater. + // Should not happen. LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); return Collections.emptyList(); }