diff --git bin/ext/cli.sh bin/ext/cli.sh index 87329f3..c837508 100644 --- bin/ext/cli.sh +++ bin/ext/cli.sh @@ -24,10 +24,11 @@ fi updateCli() { if [ "$USE_DEPRECATED_CLI" == "true" ]; then + export HADOOP_CLIENT_OPTS=" -Dproc_hivecli $HADOOP_CLIENT_OPTS " CLASS=org.apache.hadoop.hive.cli.CliDriver JAR=hive-cli-*.jar else - export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties" + export HADOOP_CLIENT_OPTS=" -Dproc_beeline $HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=beeline-log4j2.properties" CLASS=org.apache.hive.beeline.cli.HiveCli JAR=hive-beeline-*.jar fi diff --git bin/ext/hiveserver2.sh bin/ext/hiveserver2.sh index c17452c..b86a897 100644 --- bin/ext/hiveserver2.sh +++ bin/ext/hiveserver2.sh @@ -24,6 +24,7 @@ hiveserver2() { fi JAR=${HIVE_LIB}/hive-service-[0-9].*.jar + export HADOOP_CLIENT_OPTS=" -Dproc_hiveserver2 $HADOOP_CLIENT_OPTS " exec $HADOOP jar $JAR $CLASS $HIVE_OPTS "$@" } diff --git bin/ext/llap.sh bin/ext/llap.sh index 838bb3a..0462d26 100644 --- bin/ext/llap.sh +++ bin/ext/llap.sh @@ -30,7 +30,7 @@ llap () { set -e; - export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=llap-cli-log4j2.properties " + export HADOOP_CLIENT_OPTS=" -Dproc_llapcli $HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=llap-cli-log4j2.properties " # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf $HADOOP $CLASS $HIVE_OPTS -directory $TMPDIR "$@" diff --git bin/ext/llapstatus.sh bin/ext/llapstatus.sh index 96edda2..2d2c8f4 100644 --- bin/ext/llapstatus.sh +++ bin/ext/llapstatus.sh @@ -29,7 +29,7 @@ llapstatus () { set -e; - export HADOOP_CLIENT_OPTS="$HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=llap-cli-log4j2.properties " + export HADOOP_CLIENT_OPTS=" -Dproc_llapstatuscli $HADOOP_CLIENT_OPTS -Dlog4j.configurationFile=llap-cli-log4j2.properties " # hadoop 20 or newer - skip the aux_jars option. picked up from hiveconf $HADOOP $CLASS $HIVE_OPTS "$@" diff --git bin/ext/metastore.sh bin/ext/metastore.sh index 8001e87..9748f8a 100644 --- bin/ext/metastore.sh +++ bin/ext/metastore.sh @@ -26,6 +26,7 @@ metastore() { # hadoop 20 or newer - skip the aux_jars option and hiveconf + export HADOOP_CLIENT_OPTS= " -Dproc_metastore $HADOOP_CLIENT_OPTS " export HADOOP_OPTS="$HIVE_METASTORE_HADOOP_OPTS $HADOOP_OPTS" exec $HADOOP jar $JAR $CLASS "$@" } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java index f1fc285..3da8faa 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/HistoryLogger.java @@ -26,7 +26,7 @@ private static final String HISTORY_SUBMIT_TIME = "SubmitTime"; private static final String HISTORY_START_TIME = "StartTime"; private static final String HISTORY_END_TIME = "EndTime"; - private static final String HISTORY_DAG_NAME = "DagName"; + private static final String HISTORY_QUERY_ID = "QueryId"; private static final String HISTORY_DAG_ID = "DagId"; private static final String HISTORY_VERTEX_NAME = "VertexName"; private static final String HISTORY_TASK_ID = "TaskId"; @@ -42,29 +42,29 @@ public static void logFragmentStart(String applicationIdStr, String containerIdStr, String hostname, - String dagName, int dagIdentifier, String vertexName, int taskId, + String queryId, int dagIdentifier, String vertexName, int taskId, int attemptId) { HISTORY_LOGGER.info( - constructFragmentStartString(applicationIdStr, containerIdStr, hostname, dagName, dagIdentifier, + constructFragmentStartString(applicationIdStr, containerIdStr, hostname, queryId, dagIdentifier, vertexName, taskId, attemptId)); } public static void logFragmentEnd(String applicationIdStr, String containerIdStr, String hostname, - String dagName, int dagIdentifier, String vertexName, int taskId, int attemptId, + String queryId, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean failed) { HISTORY_LOGGER.info(constructFragmentEndString(applicationIdStr, containerIdStr, hostname, - dagName, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed)); + queryId, dagIdentifier, vertexName, taskId, attemptId, threadName, startTime, failed)); } private static String constructFragmentStartString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, int dagIdentifier, + String hostname, String queryId, int dagIdentifier, String vertexName, int taskId, int attemptId) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_START); lb.addHostName(hostname); lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); - lb.addDagName(dagName); + lb.addQueryId(queryId); lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); @@ -74,14 +74,14 @@ private static String constructFragmentStartString(String applicationIdStr, Stri } private static String constructFragmentEndString(String applicationIdStr, String containerIdStr, - String hostname, String dagName, int dagIdentifier, + String hostname, String queryId, int dagIdentifier, String vertexName, int taskId, int attemptId, String threadName, long startTime, boolean succeeded) { HistoryLineBuilder lb = new HistoryLineBuilder(EVENT_TYPE_FRAGMENT_END); lb.addHostName(hostname); lb.addAppid(applicationIdStr); lb.addContainerId(containerIdStr); - lb.addDagName(dagName); + lb.addQueryId(queryId); lb.addDagId(dagIdentifier); lb.addVertexName(vertexName); lb.addTaskId(taskId); @@ -112,8 +112,8 @@ HistoryLineBuilder addContainerId(String containerId) { return setKeyValue(HISTORY_CONTAINER_ID, containerId); } - HistoryLineBuilder addDagName(String dagName) { - return setKeyValue(HISTORY_DAG_NAME, dagName); + HistoryLineBuilder addQueryId(String queryId) { + return setKeyValue(HISTORY_QUERY_ID, queryId); } HistoryLineBuilder addDagId(int dagId) { diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6908138..0298f2c 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -196,7 +196,9 @@ public SubmitWorkResponseProto submitWork(SubmitWorkRequestProto request) throws QueryIdentifierProto qIdProto = vertex.getQueryIdentifier(); HistoryLogger.logFragmentStart(qIdProto.getApplicationIdString(), request.getContainerIdString(), - localAddress.get().getHostName(), vertex.getDagName(), qIdProto.getDagIndex(), + localAddress.get().getHostName(), + constructUniqueQueryId(vertex.getHiveQueryId(), qIdProto.getDagIndex()), + qIdProto.getDagIndex(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber()); // This is the start of container-annotated logging. @@ -506,4 +508,10 @@ public void taskKilled(String amLocation, int port, String user, public Set getExecutorStatus() { return executorService.getExecutorsStatus(); } + + public static String constructUniqueQueryId(String queryId, int dagIndex) { + // Hive QueryId is not always unique. + return queryId + "-" + dagIndex; + } + } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index dcad3d8..c1f6c96 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -22,7 +22,7 @@ import java.text.SimpleDateFormat; import java.util.Comparator; import java.util.Date; -import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -199,9 +199,12 @@ protected SimpleDateFormat initialValue() { @Override public Set getExecutorsStatus() { // TODO Change this method to make the output easier to parse (parse programmatically) - Set result = new HashSet<>(); + Set result = new LinkedHashSet<>(); + Set running = new LinkedHashSet<>(); + Set waiting = new LinkedHashSet<>(); StringBuilder value = new StringBuilder(); for (Map.Entry e : knownTasks.entrySet()) { + boolean isWaiting; value.setLength(0); value.append(e.getKey()); TaskWrapper task = e.getValue(); @@ -209,29 +212,39 @@ protected SimpleDateFormat initialValue() { TaskRunnerCallable c = task.getTaskRunnerCallable(); if (c != null && c.getVertexSpec() != null) { SignableVertexSpec fs = c.getVertexSpec(); - value.append(isFirst ? " (" : ", ").append(fs.getDagName()) + value.append(isFirst ? " (" : ", ").append(c.getQueryId()) .append("/").append(fs.getVertexName()); isFirst = false; } value.append(isFirst ? " (" : ", "); if (task.isInWaitQueue()) { + isWaiting = true; value.append("in queue"); } else if (c != null) { long startTime = c.getStartTime(); if (startTime != 0) { + isWaiting = false; value.append("started at ").append(sdf.get().format(new Date(startTime))); } else { + isWaiting = false; value.append("not started"); } } else { + isWaiting = true; value.append("has no callable"); } if (task.isInPreemptionQueue()) { value.append(", ").append("preemptable"); } value.append(")"); - result.add(value.toString()); + if (isWaiting) { + waiting.add(value.toString()); + } else { + running.add(value.toString()); + } } + result.addAll(waiting); + result.addAll(running); return result; } @@ -737,7 +750,7 @@ private void updatePreemptionListAndNotify(EndReason reason) { if (removed && isInfoEnabled) { TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable(); LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(), - trc.getVertexSpec()) + " request " + state + "! Removed from preemption list."); + trc.getVertexSpec(), trc.getQueryId()) + " request " + state + "! Removed from preemption list."); } } diff --git llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 4b677aa..0c54525 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -44,6 +44,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.log4j.MDC; import org.apache.log4j.NDC; import org.apache.tez.common.CallableWithNdc; @@ -51,7 +52,10 @@ import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.hadoop.shim.HadoopShim; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.TaskSpec; @@ -105,6 +109,7 @@ private volatile String threadName; private final LlapDaemonExecutorMetrics metrics; private final String requestId; + private final String threadNameSuffix; private final String queryId; private final HadoopShim tezHadoopShim; private boolean shouldRunTask = true; @@ -147,8 +152,11 @@ public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo frag } this.metrics = metrics; this.requestId = taskSpec.getTaskAttemptID().toString(); - // TODO Change this to the queryId/Name when that's available. - this.queryId = vertex.getDagName(); + threadNameSuffix = constructThreadNameSuffix(taskSpec.getTaskAttemptID()); + + this.queryId = ContainerRunnerImpl + .constructUniqueQueryId(vertex.getHiveQueryId(), + fragmentInfo.getQueryInfo().getDagIdentifier()); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; this.tezHadoopShim = tezHadoopShim; @@ -161,14 +169,15 @@ public long getStartTime() { return startTime; } + @Override protected TaskRunner2Result callInternal() throws Exception { setMDCFromNDC(); try { isStarted.set(true); - this.startTime = System.currentTimeMillis(); + threadName = Thread.currentThread().getName(); this.threadName = Thread.currentThread().getName(); if (LOG.isDebugEnabled()) { LOG.debug("canFinish: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); @@ -191,7 +200,7 @@ protected TaskRunner2Result callInternal() throws Exception { new LinkedBlockingQueue(), new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("TezTaskRunner") + .setNameFormat("TezTR-" + threadNameSuffix) .build()); // TODO Consolidate this code with TezChild. @@ -286,6 +295,24 @@ private void setMDCFromNDC() { MDC.put("fragmentId", fragId); } + private String constructThreadNameSuffix(TezTaskAttemptID taskAttemptId) { + StringBuilder sb = new StringBuilder(); + TezTaskID taskId = taskAttemptId.getTaskID(); + TezVertexID vertexId = taskId.getVertexID(); + TezDAGID dagId = vertexId.getDAGId(); + ApplicationId appId = dagId.getApplicationId(); + long clusterTs = appId.getClusterTimestamp(); + long clusterTsShort = clusterTs % 1_000_000L; + + sb.append(clusterTsShort).append("_"); + sb.append(appId.getId()).append("_"); + sb.append(dagId.getId()).append("_"); + sb.append(vertexId.getId()).append("_"); + sb.append(taskId.getId()).append("_"); + sb.append(taskAttemptId.getId()); + return sb.toString(); + } + /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. @@ -493,7 +520,7 @@ public void onSuccess(TaskRunner2Result result) { @Override public void onFailure(Throwable t) { LOG.error("TezTaskRunner execution failed for : " - + getTaskIdentifierString(request, vertex), t); + + getTaskIdentifierString(request, vertex, queryId), t); isCompleted.set(true); fragmentCompletionHanler.fragmentComplete(fragmentInfo); // TODO HIVE-10236 Report a fatal error over the umbilical @@ -503,7 +530,7 @@ public void onFailure(Throwable t) { protected void logFragmentEnd(boolean success) { HistoryLogger.logFragmentEnd(vertex.getQueryIdentifier().getApplicationIdString(), - request.getContainerIdString(), executionContext.getHostName(), vertex.getDagName(), + request.getContainerIdString(), executionContext.getHostName(), queryId, fragmentInfo.getQueryInfo().getDagIdentifier(), vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), taskRunnerCallable.threadName, taskRunnerCallable.startTime, success); @@ -524,11 +551,11 @@ public ConfParams(int amHeartbeatIntervalMsMax, long amCounterHeartbeatInterval, } public static String getTaskIdentifierString( - SubmitWorkRequestProto request, SignableVertexSpec vertex) { + SubmitWorkRequestProto request, SignableVertexSpec vertex, String queryId) { StringBuilder sb = new StringBuilder(); sb.append("AppId=").append(vertex.getQueryIdentifier().getApplicationIdString()) .append(", containerId=").append(request.getContainerIdString()) - .append(", Dag=").append(vertex.getDagName()) + .append(", QueryId=").append(queryId) .append(", Vertex=").append(vertex.getVertexName()) .append(", FragmentNum=").append(request.getFragmentNumber()) .append(", Attempt=").append(request.getAttemptNumber()); diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 5dc1be5..8f056f5 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -132,6 +132,7 @@ public static SubmitWorkRequestProto createSubmitWorkRequestProto( VertexOrBinary.newBuilder().setVertex( SignableVertexSpec.newBuilder() .setDagName(dagName) + .setHiveQueryId(dagName) .setUser("MockUser") .setTokenIdentifier("MockToken_1") .setQueryIdentifier( diff --git llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java index 79c2564..62b90d6 100644 --- llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java +++ llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestFirstInFirstOutComparator.java @@ -77,6 +77,7 @@ private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndU .build()) .setVertexIndex(vId.getId()) .setDagName(dagName) + .setHiveQueryId(dagName) .setVertexName("MockVertex") .setUser("MockUser") .setTokenIdentifier("MockToken_1")