diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 47db0c0..2aaf96b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -503,6 +503,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Comma-separated list of statistics publishers to be invoked on counters on each job. \n" + "A client stats publisher is specified as the name of a Java class which implements the \n" + "org.apache.hadoop.hive.ql.stats.ClientStatsPublisher interface."), + ATSHOOKQUEUECAPACITY("hive.ats.hook.queue.capacity", 64, + "Queue size for the ATS Hook executor. If the number of outstanding submissions \n" + + "to the ATS executor exceed this amount, the Hive ATS Hook will not try to log queries to ATS."), EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"), EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8, "How many jobs at most can be executed in parallel"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java index 3651c9c..4878130 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ATSHook.java @@ -25,8 +25,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; @@ -84,12 +88,22 @@ PerfLogger.GET_SPLITS, PerfLogger.RUN_TASKS, }; - public ATSHook() { + private static void setupAtsExecutor(HiveConf conf) { synchronized(LOCK) { if (executor == null) { - executor = Executors.newSingleThreadExecutor( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build()); + // The call to ATS appears to block indefinitely, blocking the ATS thread while + // the hook continues to submit work to the ExecutorService with each query. + // Over time the queued items can cause OOM as the HookContext seems to contain + // some items which use a lot of memory. + // Prevent this situation by creating executor with bounded capacity - + // the event will not be sent to ATS if there are too many outstanding work submissions. + int queueCapacity = conf.getIntVar(HiveConf.ConfVars.ATSHOOKQUEUECAPACITY); + + LOG.info("Creating ATS executor queue with capacity " + queueCapacity); + BlockingQueue queue = new LinkedBlockingQueue(queueCapacity); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build(); + executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, queue, threadFactory); YarnConfiguration yarnConf = new YarnConfiguration(); timelineClient = TimelineClient.createTimelineClient(); @@ -109,7 +123,9 @@ public void run() { }); } } + } + public ATSHook() { LOG.info("Created ATS Hook"); } @@ -118,77 +134,84 @@ public void run(final HookContext hookContext) throws Exception { final long currentTime = System.currentTimeMillis(); final HiveConf conf = new HiveConf(hookContext.getConf()); final QueryState queryState = hookContext.getQueryState(); + final String queryId = queryState.getQueryId(); - executor.submit(new Runnable() { - @Override - public void run() { - try { - QueryPlan plan = hookContext.getQueryPlan(); - if (plan == null) { - return; - } - String queryId = plan.getQueryId(); - String opId = hookContext.getOperationId(); - long queryStartTime = plan.getQueryStartTime(); - String user = hookContext.getUgi().getUserName(); - String requestuser = hookContext.getUserName(); - if (hookContext.getUserName() == null ){ - requestuser = hookContext.getUgi().getUserName() ; - } - int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); - int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); - if (numMrJobs + numTezJobs <= 0) { - return; // ignore client only queries - } + try { + setupAtsExecutor(conf); - switch(hookContext.getHookType()) { - case PRE_EXEC_HOOK: - ExplainConfiguration config = new ExplainConfiguration(); - config.setFormatted(true); - ExplainWork work = new ExplainWork(null,// resFile - null,// pCtx - plan.getRootTasks(),// RootTasks - plan.getFetchTask(),// FetchTask - null,// analyzer - config, //explainConfig - null// cboInfo - ); - @SuppressWarnings("unchecked") - ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf); - explain.initialize(queryState, plan, null, null); - String query = plan.getQueryStr(); - JSONObject explainPlan = explain.getJSONPlan(null, work); - String logID = conf.getLogIdVar(hookContext.getSessionId()); - List tablesRead = getTablesFromEntitySet(hookContext.getInputs()); - List tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); - String executionMode = getExecutionMode(plan).name(); - String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); - if (hiveInstanceAddress == null) { - hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); + executor.submit(new Runnable() { + @Override + public void run() { + try { + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + return; + } + String queryId = plan.getQueryId(); + String opId = hookContext.getOperationId(); + long queryStartTime = plan.getQueryStartTime(); + String user = hookContext.getUgi().getUserName(); + String requestuser = hookContext.getUserName(); + if (hookContext.getUserName() == null ){ + requestuser = hookContext.getUgi().getUserName() ; + } + int numMrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); + int numTezJobs = Utilities.getTezTasks(plan.getRootTasks()).size(); + if (numMrJobs + numTezJobs <= 0) { + return; // ignore client only queries + } + + switch(hookContext.getHookType()) { + case PRE_EXEC_HOOK: + ExplainConfiguration config = new ExplainConfiguration(); + config.setFormatted(true); + ExplainWork work = new ExplainWork(null,// resFile + null,// pCtx + plan.getRootTasks(),// RootTasks + plan.getFetchTask(),// FetchTask + null,// analyzer + config, //explainConfig + null// cboInfo + ); + @SuppressWarnings("unchecked") + ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf); + explain.initialize(queryState, plan, null, null); + String query = plan.getQueryStr(); + JSONObject explainPlan = explain.getJSONPlan(null, work); + String logID = conf.getLogIdVar(hookContext.getSessionId()); + List tablesRead = getTablesFromEntitySet(hookContext.getInputs()); + List tablesWritten = getTablesFromEntitySet(hookContext.getOutputs()); + String executionMode = getExecutionMode(plan).name(); + String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); + if (hiveInstanceAddress == null) { + hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); + } + String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; + fireAndForget(conf, + createPreHookEvent(queryId, query, explainPlan, queryStartTime, + user, requestuser, numMrJobs, numTezJobs, opId, + hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, + hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, + tablesRead, tablesWritten, conf)); + break; + case POST_EXEC_HOOK: + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger())); + break; + case ON_FAILURE_HOOK: + fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger())); + break; + default: + //ignore + break; } - String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; - fireAndForget(conf, - createPreHookEvent(queryId, query, explainPlan, queryStartTime, - user, requestuser, numMrJobs, numTezJobs, opId, - hookContext.getIpAddress(), hiveInstanceAddress, hiveInstanceType, - hookContext.getSessionId(), logID, hookContext.getThreadId(), executionMode, - tablesRead, tablesWritten, conf)); - break; - case POST_EXEC_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser, true, opId, hookContext.getPerfLogger())); - break; - case ON_FAILURE_HOOK: - fireAndForget(conf, createPostHookEvent(queryId, currentTime, user, requestuser , false, opId, hookContext.getPerfLogger())); - break; - default: - //ignore - break; + } catch (Exception e) { + LOG.info("Failed to submit plan to ATS for " + queryId + ": " + StringUtils.stringifyException(e)); } - } catch (Exception e) { - LOG.info("Failed to submit plan to ATS: " + StringUtils.stringifyException(e)); } - } - }); + }); + } catch (Exception e) { + LOG.info("Failed to submit to ATS for " + queryId + ": " + StringUtils.stringifyException(e)); + } } protected List getTablesFromEntitySet(Set entities) {