diff --git ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index ed18f7e..677cbf3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -19,13 +19,17 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.Executors; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -37,8 +41,6 @@ import org.json.JSONObject; -import static org.apache.hadoop.hive.ql.hooks.HookContext.HookType.*; - /** * ATSHook sends query + plan info to Yarn App Timeline Server. To enable (hadoop 2.4 and up) set * hive.exec.pre.hooks/hive.exec.post.hooks/hive.exec.failure.hooks to include this class. @@ -53,14 +55,36 @@ private enum EventTypes { QUERY_SUBMITTED, QUERY_COMPLETED }; private enum OtherInfoTypes { QUERY, STATUS, TEZ, MAPRED }; private enum PrimaryFilterTypes { user }; + + private static final String ATS_QUEUE_SIZE = "hive.ats.queue.size"; + private static final String ATS_SHUTDOWN_WAIT = "hive.ats.shutdown.wait"; + + private static final int QUEUE_SIZE = -1; private static final int WAIT_TIME = 3; + private static final int SUBMIT_DELAY_LOG_THRESHOLD = 20; - public ATSHook() { + private ExecutorService getExecutors(HiveConf conf) { synchronized(LOCK) { if (executor == null) { - executor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build()); + final int queue = conf.getInt(ATS_QUEUE_SIZE, QUEUE_SIZE); + final int wait = conf.getInt(ATS_SHUTDOWN_WAIT, WAIT_TIME); + + BlockingQueue workQueue = + new LinkedBlockingQueue(queue < 0 ? Integer.MAX_VALUE : queue); + + executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build(), + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + if (!e.isShutdown()) { + Runnable oldest = e.getQueue().poll(); + LOG.warn("Discarded " + oldest + " by queue full"); + e.execute(r); + } + } + }); YarnConfiguration yarnConf = new YarnConfiguration(); timelineClient = TimelineClient.createTimelineClient(); @@ -72,64 +96,24 @@ public ATSHook() { public void run() { try { executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + executor.awaitTermination(wait, TimeUnit.SECONDS); executor = null; - } catch(InterruptedException ie) { /* ignore */ } - timelineClient.stop(); + } catch(InterruptedException ie) { + /* ignore */ + } finally { + timelineClient.stop(); + } } }); } } - - LOG.info("Created ATS Hook"); + return executor; } @Override - public void run(final HookContext hookContext) throws Exception { - final long currentTime = System.currentTimeMillis(); - executor.submit(new Runnable() { - @Override - public void run() { - try { - QueryPlan plan = hookContext.getQueryPlan(); - if (plan == null) { - return; - } - String queryId = plan.getQueryId(); - long queryStartTime = plan.getQueryStartTime(); - String user = hookContext.getUgi().getUserName(); - int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); - int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); - - if (numMrJobs + numTezJobs <= 0) { - return; // ignore client only queries - } - - switch(hookContext.getHookType()) { - case PRE_EXEC_HOOK: - ExplainTask explain = new ExplainTask(); - explain.initialize(hookContext.getConf(), plan, null); - String query = plan.getQueryStr(); - JSONObject explainPlan = explain.getJSONPlan(null, null, plan.getRootTasks(), - plan.getFetchTask(), true, false, false); - fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query, - explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); - break; - case POST_EXEC_HOOK: - fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, true)); - break; - case ON_FAILURE_HOOK: - fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, false)); - break; - default: - //ignore - break; - } - } catch (Exception e) { - LOG.info("Failed to submit plan to ATS: " + StringUtils.stringifyException(e)); - } - } - }); + public void run(HookContext hookContext) throws Exception { + long currentTime = System.currentTimeMillis(); + getExecutors(hookContext.getConf()).submit(new ATSWork(hookContext, currentTime)); } TimelineEntity createPreHookEvent(String queryId, String query, JSONObject explainPlan, @@ -178,7 +162,70 @@ TimelineEntity createPostHookEvent(String queryId, long stopTime, String user, b return atsEntity; } + // synchronous call synchronized void fireAndForget(Configuration conf, TimelineEntity entity) throws Exception { timelineClient.putEntities(entity); } + + private class ATSWork implements Runnable { + + private final HookContext hookContext; + private final long currentTime; + + public ATSWork(HookContext hookContext, long currentTime) { + this.hookContext = hookContext; + this.currentTime = currentTime; + } + + @Override + public void run() { + long delay = System.currentTimeMillis() - currentTime; + if (delay > SUBMIT_DELAY_LOG_THRESHOLD * 1000) { + LOG.info("Submission " + this + " delayed " + delay + " msec"); + } + try { + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + String queryId = plan.getQueryId(); + long queryStartTime = plan.getQueryStartTime(); + String user = hookContext.getUgi().getUserName(); + int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); + int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); + + if (numMrJobs + numTezJobs <= 0) { + return; // ignore client only queries + } + + switch (hookContext.getHookType()) { + case PRE_EXEC_HOOK: + ExplainTask explain = new ExplainTask(); + explain.initialize(hookContext.getConf(), plan, null); + String query = plan.getQueryStr(); + JSONObject explainPlan = explain.getJSONPlan(null, null, plan.getRootTasks(), + plan.getFetchTask(), true, false, false); + fireAndForget(hookContext.getConf(), createPreHookEvent(queryId, query, + explainPlan, queryStartTime, user, numMrJobs, numTezJobs)); + break; + case POST_EXEC_HOOK: + fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, true)); + break; + case ON_FAILURE_HOOK: + fireAndForget(hookContext.getConf(), createPostHookEvent(queryId, currentTime, user, false)); + break; + default: + //ignore + break; + } + } catch (Exception e) { + LOG.info("Failed to submit plan to ATS: " + StringUtils.stringifyException(e)); + } + } + + @Override + public String toString() { + return hookContext.getHookType() + ":" + hookContext.getQueryPlan().getQueryId(); + } + } }