diff --git a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java index af0f87bac3..cdf70ca330 100644 --- a/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java +++ b/common/src/java/org/apache/hadoop/hive/common/metrics/common/MetricsConstant.java @@ -50,6 +50,13 @@ // The number of Hive operations that are waiting to enter the compile block public static final String WAITING_COMPILE_OPS = "waiting_compile_ops"; + // The number of Hive operations that are waiting for a TEZ session + public static final String WAITING_TEZ_SESSION = "waiting_tez_session"; + // Statistics on wait times for the queries (if any) that are currently waiting for a TEZ session + public static final String WAITING_TEZ_SESSION_MAX = "waiting_tez_session_max"; + public static final String WAITING_TEZ_SESSION_AVG = "waiting_tez_session_avg"; + public static final String WAITING_TEZ_SESSION_STDDEV = "waiting_tez_session_stddev"; + // The number of map reduce tasks executed by the HiveServer2 since the last restart public static final String HIVE_MR_TASKS = "hive_mapred_tasks"; // The number of spark tasks executed by the HiveServer2 since the last restart diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index dd7ccd4764..31150d0fae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -184,19 +184,24 @@ public int execute(DriverContext driverContext) { CallerContext callerContext = CallerContext.create( "HIVE", queryPlan.getQueryId(), "HIVE_QUERY_ID", queryPlan.getQueryStr()); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_GET_SESSION); - session = sessionRef.value = WorkloadManagerFederation.getSession( - sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_GET_SESSION); - try { - ss.setTezSession(session); - LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), - wmContext.getQueryId()); - - // Ensure the session is open and has the necessary local resources. - // This would refresh any conf resources and also local resources. - ensureSessionHasResources(session, allNonConfFiles); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_GET_SESSION); + ss.setWaitingTezSession(); + try { + session = sessionRef.value = WorkloadManagerFederation.getSession( + sessionRef.value, conf, mi, getWork().getLlapMode(), wmContext); + + ss.setTezSession(session); + LOG.info("Subscribed to counters: {} for queryId: {}", wmContext.getSubscribedCounters(), + wmContext.getQueryId()); + + // Ensure the session is open and has the necessary local resources. + // This would refresh any conf resources and also local resources. + ensureSessionHasResources(session, allNonConfFiles); + } finally { + ss.resetWaitingTezSession(); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_GET_SESSION); + } // This is a combination of the jar stuff from conf, and not from conf. List allNonAppResources = session.getLocalizedResources(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 9d631ed43d..618c207d83 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -325,6 +325,8 @@ private final AtomicLong sparkSessionId = new AtomicLong(); + private volatile long waitingTezSession; + public HiveConf getConf() { return sessionConf; } @@ -2041,6 +2043,8 @@ public double progressedPercentage() { return percentage; } }; + + } public void updateProgressMonitor(ProgressMonitor progressMonitor) { @@ -2078,6 +2082,18 @@ public void addCleanupItem(Closeable item) { public String getNewSparkSessionId() { return getSessionId() + "_" + Long.toString(this.sparkSessionId.getAndIncrement()); } + + public void setWaitingTezSession() { + this.waitingTezSession = System.currentTimeMillis(); + } + + public void resetWaitingTezSession() { + this.waitingTezSession = 0; + } + + public long getWaitingTezSession() { + return this.waitingTezSession; + } } class ResourceMaps { diff --git a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java index 694a691450..2a9ad5a4f7 100644 --- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java +++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.hooks.HookUtils; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.service.CompositeService; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.SessionHandle; @@ -115,6 +116,7 @@ public synchronized void init(HiveConf hiveConf) { if(metrics != null){ registerOpenSesssionMetrics(metrics); registerActiveSesssionMetrics(metrics); + registerTezSessionMetrics(metrics); } userLimit = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER); @@ -179,6 +181,98 @@ public Integer getValue() { metrics.addRatio(MetricsConstant.HS2_AVG_ACTIVE_SESSION_TIME, activeSessionTime, activeSessionCnt); } + private Iterable getHiveSessionsWaitingForTezSession() { + Iterable filtered = Iterables.filter(getSessions(), new Predicate() { + @Override + public boolean apply(HiveSession hiveSession) { + SessionState ss = hiveSession.getSessionState(); + if (ss != null) { + return ss.getWaitingTezSession() != 0; + } + return false; + } + }); + return filtered; + } + + private ArrayList getWaitingTezSessionTimes() { + long currentTime = System.currentTimeMillis(); + ArrayList waitTimes = new ArrayList(); + for (HiveSession session : getHiveSessionsWaitingForTezSession()) { + long waitingSince = session.getSessionState().getWaitingTezSession(); + // Note it is possible that the query has by now received a Tez session, in which case + // waitingSince would have been reset to 0. + if (waitingSince != 0) { + waitTimes.add(currentTime - waitingSince); + } + } + return waitTimes; + } + + private double findAvg(ArrayList vals) { + int count = 0; + double sum = 0; + for (Long val : vals) { + count++; + sum += val.doubleValue(); + } + return (count == 0 ? 0 : sum / count); + } + + private void registerTezSessionMetrics(Metrics metrics) { + MetricsVariable waitingTezSessionCnt = new MetricsVariable() { + @Override + public Integer getValue() { + return Iterables.size(getHiveSessionsWaitingForTezSession()); + } + }; + + MetricsVariable waitingTezSessionTimeMax = new MetricsVariable() { + @Override + public Double getValue() { + double max = 0; + for (Long waitTime : getWaitingTezSessionTimes()) { + if (waitTime.doubleValue() > max) { + max = waitTime; + } + } + return max; + } + }; + + MetricsVariable waitingTezSessionTimeAvg = new MetricsVariable() { + @Override + public Double getValue() { + return findAvg(getWaitingTezSessionTimes()); + } + }; + + MetricsVariable waitingTezSessionTimeStddev = new MetricsVariable() { + @Override + public Double getValue() { + ArrayList waitTimes = getWaitingTezSessionTimes(); + double avg = findAvg(waitTimes); + int count = 0; + double result = 0; + for (long val : waitTimes) { + count++; + double diffFromMean = val - avg; + result += diffFromMean * diffFromMean; + } + if (count == 0) { + return Double.valueOf(0); + } + result = result / count; + return Math.sqrt(result); + } + }; + + metrics.addGauge(MetricsConstant.WAITING_TEZ_SESSION, waitingTezSessionCnt); + metrics.addGauge(MetricsConstant.WAITING_TEZ_SESSION_MAX, waitingTezSessionTimeMax); + metrics.addGauge(MetricsConstant.WAITING_TEZ_SESSION_AVG, waitingTezSessionTimeAvg); + metrics.addGauge(MetricsConstant.WAITING_TEZ_SESSION_STDDEV, waitingTezSessionTimeStddev); + } + private void initSessionImplClassName() { this.sessionImplclassName = hiveConf.getVar(ConfVars.HIVE_SESSION_IMPL_CLASSNAME); this.sessionImplWithUGIclassName = hiveConf.getVar(ConfVars.HIVE_SESSION_IMPL_WITH_UGI_CLASSNAME);