diff --git ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index 9132a21..1fce294 100644 --- ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -35,6 +35,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -147,6 +148,25 @@ public String getQueryId() { return queryId; } + /** + * Get the name of the query + */ + public String getQueryName(Configuration conf) { + String name = conf.get("mapred.job.name"); + + if (name == null) { + name = conf.get("hive.query.name"); + } + + if (name == null) { + // fall back to id + name = getQueryId(); + } + + assert name != null; + return name; + } + public static String makeQueryId() { GregorianCalendar gc = new GregorianCalendar(); String userid = System.getProperty("user.name"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a6d911d..df62d63 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -167,9 +167,8 @@ public int execute(DriverContext driverContext) { if (driverContext.getCtx() == null) { boolean a = false; } - CallerContext callerContext = CallerContext.create("HIVE", - conf.getLogIdVar(SessionState.get().getSessionId()) + " " - + conf.getVar(HiveConf.ConfVars.HIVEQUERYID), + CallerContext callerContext = CallerContext.create( + "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr()); dag.setCallerContext(callerContext); @@ -311,7 +310,7 @@ DAG build(JobConf conf, TezWork work, Path scratchDir, FileSystem fs = scratchDir.getFileSystem(conf); // the name of the dag is what is displayed in the AM/Job UI - DAG dag = DAG.create(work.getName()); + DAG dag = DAG.create(queryPlan.getQueryName(conf)); // set some info for the query JSONObject json = new JSONObject(new LinkedHashMap()).put("context", "Hive") diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 38b6b5d..af2d759 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -50,15 +50,16 @@ private static final Logger LOG = LoggerFactory.getLogger(ATSHook.class.getName()); private static final Object LOCK = new Object(); + private static final String VERSION = "0.2"; private static ExecutorService executor; private static TimelineClient timelineClient; private enum EntityTypes { HIVE_QUERY_ID }; private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { - QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME + QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, THREAD_NAME, VERSION }; - private enum PrimaryFilterTypes { user, requestuser, operationid }; + private enum PrimaryFilterTypes { user, requestuser, operationid, queryname }; private static final int WAIT_TIME = 3; public ATSHook() { @@ -108,6 +109,7 @@ public void run() { long queryStartTime = plan.getQueryStartTime(); String user = hookContext.getUgi().getUserName(); String requestuser = hookContext.getUserName(); + String queryName = plan.getQueryName(conf); if (hookContext.getUserName() == null ){ requestuser = hookContext.getUgi().getUserName() ; } @@ -140,13 +142,13 @@ public void run() { JSONObject explainPlan = explain.getJSONPlan(null, work); String logID = conf.getLogIdVar(SessionState.get().getSessionId()); fireAndForget(conf, createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, logID)); + user, requestuser, numMrJobs, numTezJobs, opId, logID, queryName)); break; case POST_EXEC_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, queryName)); break; case ON_FAILURE_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId)); + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, queryName)); break; default: //ignore @@ -161,7 +163,7 @@ public void run() { TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, long startTime, String user, String requestuser, int numMrJobs, int numTezJobs, String opId, - String logID) throws Exception { + String logID, String queryName) throws Exception { JSONObject queryObj = new JSONObject(new LinkedHashMap<>()); queryObj.put("queryText", query); @@ -178,6 +180,7 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queryname.name(), queryName); if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); @@ -193,11 +196,12 @@ TimelineEntity createPreHookEvent(String queryId, String query, JSONObject expla atsEntity.addOtherInfo(OtherInfoTypes.MAPRED.name(), numMrJobs > 0); atsEntity.addOtherInfo(OtherInfoTypes.INVOKER_INFO.name(), logID); atsEntity.addOtherInfo(OtherInfoTypes.THREAD_NAME.name(), Thread.currentThread().getName()); + atsEntity.addOtherInfo(OtherInfoTypes.VERSION.name(), VERSION); return atsEntity; } TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, String requestuser, boolean success, - String opId) { + String opId, String queryName) { LOG.info("Received post-hook notification for :" + queryId); TimelineEntity atsEntity = new TimelineEntity(); @@ -205,6 +209,7 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, S atsEntity.setEntityType(EntityTypes.HIVE_QUERY_ID.name()); atsEntity.addPrimaryFilter(PrimaryFilterTypes.user.name(), user); atsEntity.addPrimaryFilter(PrimaryFilterTypes.requestuser.name(), requestuser); + atsEntity.addPrimaryFilter(PrimaryFilterTypes.queryname.name(), queryName); if (opId != null) { atsEntity.addPrimaryFilter(PrimaryFilterTypes.operationid.name(), opId); } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 858cca0..f95afd5 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -158,6 +159,9 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { task = new TezTask(utils); task.setWork(work); task.setConsole(mock(LogHelper.class)); + QueryPlan plan = new QueryPlan(); + plan.setQueryId("foo"); + task.setQueryPlan(plan); conf = new JobConf(); appLr = mock(LocalResource.class);