diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index fa69f13965..49aef70701 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; import org.apache.hadoop.hive.ql.plan.ExplainWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain; @@ -84,10 +85,12 @@ QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION, CLIENT_IP_ADDRESS, HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF, LLAP_APP_ID }; - private enum ExecutionMode { + @VisibleForTesting + enum ExecutionMode { MR, TEZ, LLAP, SPARK, NONE }; - private enum PrimaryFilterTypes { + @VisibleForTesting + enum PrimaryFilterTypes { user, requestuser, operationid, executionmode, tablesread, tableswritten, queue }; @@ -113,13 +116,13 @@ private static void setupAtsExecutor(HiveConf conf) { // Executor to create the ATS events. // This can use significant resources and should not be done on the main query thread. LOG.info("Creating ATS executor queue with capacity " + queueCapacity); - BlockingQueue queue = new LinkedBlockingQueue(queueCapacity); + BlockingQueue queue = new LinkedBlockingQueue<>(queueCapacity); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build(); executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, queue, threadFactory); // Create a separate thread to send the events. // Keep separate from the creating events in case the send blocks. - BlockingQueue senderQueue = new LinkedBlockingQueue(queueCapacity); + BlockingQueue senderQueue = new LinkedBlockingQueue<>(queueCapacity); senderExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, senderQueue, threadFactory); YarnConfiguration yarnConf = new YarnConfiguration(); @@ -225,7 +228,7 @@ public void run(final HookContext hookContext) throws Exception { final QueryState queryState = hookContext.getQueryState(); final String queryId = queryState.getQueryId(); - final Map durations = new HashMap(); + final Map durations = new HashMap<>(); for (String key : hookContext.getPerfLogger().getEndTimes().keySet()) { durations.put(key, hookContext.getPerfLogger().getDuration(key)); } @@ -269,7 +272,6 @@ public void run() { null, // cboInfo plan.getOptimizedQueryString() // optimizedSQL ); - @SuppressWarnings("unchecked") ExplainTask explain = (ExplainTask) TaskFactory.get(work); explain.initialize(queryState, plan, null, null); String query = plan.getQueryStr(); @@ -277,7 +279,7 @@ public void run() { String logID = conf.getLogIdVar(hookContext.getSessionId()); List tablesRead = getTablesFromEntitySet(hookContext.getInputs()); List tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); - String executionMode = getExecutionMode(plan).name(); + ExecutionMode executionMode = getExecutionMode(plan); String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); if (hiveInstanceAddress == null) { hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); @@ -312,7 +314,7 @@ public void run() { } protected List getTablesFromEntitySet(Set entities) { - List tableNames = new ArrayList(); + List tableNames = new ArrayList<>(); for (Entity entity : entities) { if (entity.getType() == Entity.Type.TABLE) { tableNames.add(entity.getTable().getFullyQualifiedName()); @@ -348,7 +350,7 @@ protected ExecutionMode getExecutionMode(QueryPlan plan) { TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, String clientIpAddress, String hiveInstanceAddress, String hiveInstanceType, - String sessionID, String logID, String threadId, String executionMode, + String sessionID, String logID, String threadId, ExecutionMode executionMode, List tablesRead, List tablesWritten, HiveConf conf, ApplicationId llapAppId, String domainId) throws Exception { @@ -364,19 +366,22 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla } conf.stripHiddenConfigurations(conf); - Map confMap = new HashMap(); + Map confMap = new HashMap<>(); for (Map.Entry setting : conf) { confMap.put(setting.getKey(), setting.getValue()); } - JSONObject confObj = new JSONObject((Map) confMap); + JSONObject confObj = new JSONObject(confMap); TimelineEntity atsEntity = new TimelineEntity(); atsEntity.setEntityId(queryId); atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); - atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode); - atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), conf.get("mapreduce.job.queuename")); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.executionmode.name(), executionMode.name()); + String queueName = getQueueName(executionMode, conf); + if (queueName != null) { + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queue.name(), queueName); + } if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); @@ -459,6 +464,21 @@ public void run() { }); } + private String getQueueName(ExecutionMode mode, HiveConf conf) { + switch (mode) { + case LLAP: + return conf.get(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname); + case MR: + return conf.get(MRJobConfig.QUEUE_NAME); + case TEZ: + return conf.get(TezConfiguration.TEZ_QUEUE_NAME); + case SPARK: + case NONE: + default: + return null; + } + } + private ApplicationId determineLlapId(final HiveConf conf, QueryPlan plan) throws IOException { // Note: for now, LLAP is only supported in Tez tasks. Will never come to MR; others may // be added here, although this is only necessary to have extra debug information. diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestATSHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestATSHook.java index 3386d7b747..5654e8c307 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestATSHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestATSHook.java @@ -20,12 +20,18 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.hooks.ATSHook.ExecutionMode; +import org.apache.hadoop.hive.ql.hooks.ATSHook.PrimaryFilterTypes; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.tez.dag.api.TezConfiguration; import org.junit.Before; import org.junit.Test; import java.util.Collections; +import java.util.Set; import static org.junit.Assert.assertEquals; @@ -45,7 +51,7 @@ public void testCreatePreHookEventJsonShhouldMatch() throws Exception { "test-query-id", "test-query", new org.json.JSONObject(), 0L, "test-user", "test-request-user", 0, 0, "test-opid", "client-ip-address", "hive-instance-address", "hive-instance-type", "session-id", "log-id", - "thread-id", "execution-mode", Collections.emptyList(), Collections.emptyList(), + "thread-id", ExecutionMode.TEZ, Collections.emptyList(), Collections.emptyList(), new HiveConf(), null, "domain-id"); String resultStr = (String) timelineEntity.getOtherInfo() .get(ATSHook.OtherInfoTypes.QUERY.name()); @@ -56,4 +62,38 @@ public void testCreatePreHookEventJsonShhouldMatch() throws Exception { assertEquals(expected, result); } + + @Test + public void testQueueNames() throws Exception { + HiveConf conf = new HiveConf(); + conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue"); + conf.set(MRJobConfig.QUEUE_NAME, "mr_queue"); + conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue"); + TimelineEntity timelineEntity = uut.createPreHookEvent( + "test-query-id", "test-query", new org.json.JSONObject(), 0L, + "test-user", "test-request-user", 0, 0, "test-opid", + "client-ip-address", "hive-instance-address", "hive-instance-type", "session-id", + "log-id", "thread-id", ExecutionMode.TEZ, Collections.emptyList(), + Collections.emptyList(), conf, null, "domain-id"); + Set result = timelineEntity.getPrimaryFilters().get(PrimaryFilterTypes.queue.name()); + assertEquals(Collections.singleton("tez_queue"), result); + + timelineEntity = uut.createPreHookEvent( + "test-query-id", "test-query", new org.json.JSONObject(), 0L, + "test-user", "test-request-user", 0, 0, "test-opid", + "client-ip-address", "hive-instance-address", "hive-instance-type", "session-id", + "log-id", "thread-id", ExecutionMode.LLAP, Collections.emptyList(), + Collections.emptyList(), conf, null, "domain-id"); + result = timelineEntity.getPrimaryFilters().get(PrimaryFilterTypes.queue.name()); + assertEquals(Collections.singleton("llap_queue"), result); + + timelineEntity = uut.createPreHookEvent( + "test-query-id", "test-query", new org.json.JSONObject(), 0L, + "test-user", "test-request-user", 0, 0, "test-opid", + "client-ip-address", "hive-instance-address", "hive-instance-type", "session-id", + "log-id", "thread-id", ExecutionMode.MR, Collections.emptyList(), + Collections.emptyList(), conf, null, "domain-id"); + result = timelineEntity.getPrimaryFilters().get(PrimaryFilterTypes.queue.name()); + assertEquals(Collections.singleton("mr_queue"), result); + } }