diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index a7687d59004..bf03fe6ea3f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -675,6 +675,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Table alias will be added to column names for queries of type \"select *\" or \n" + "if query explicitly uses table alias \"select r1.x..\"."), + HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64, + "Queue capacity for the proto events logging threads."), HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "", "Base directory into which the proto event messages are written by HiveProtoLoggingHook."), HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 8eab54859bf..86a68008515 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -92,9 +92,8 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -179,7 +178,8 @@ private final Clock clock; private final String logFileName; private final DatePartitionedLogger logger; - private final ScheduledExecutorService logWriter; + private final ScheduledThreadPoolExecutor logWriter; + private final int queueCapacity; private int logFileCount = 0; private ProtoMessageWriter writer; private LocalDate writerDate; @@ -189,6 +189,8 @@ this.clock = clock; // randomUUID is slow, since its cryptographically secure, only first query will take time. this.logFileName = "hive_" + UUID.randomUUID().toString(); + this.queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, + ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.defaultIntVal); String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH); if (StringUtils.isBlank(baseDir)) { baseDir = null; @@ -214,7 +216,7 @@ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Hive Hook Proto Log Writer %d").build(); - logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory); + logWriter = new ScheduledThreadPoolExecutor(1, threadFactory); long rolloverInterval = conf.getTimeVar( HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS); @@ -267,10 +269,17 @@ void handle(HookContext hookContext) { } if (event != null) { try { - logWriter.execute(() -> writeEvent(event)); + // ScheduledThreadPoolExecutor uses an unbounded queue which cannot be replaced with a bounded queue. + // Therefore checking queue capacity manually here. + if (logWriter.getQueue().size() < queueCapacity) { + logWriter.execute(() -> writeEvent(event)); + } else { + LOG.warn("Writer queue full ignoring event {} for query {}", + hookContext.getHookType(), plan.getQueryId()); + } } catch (RejectedExecutionException e) { LOG.warn("Writer queue full ignoring event {} for query {}", - hookContext.getHookType(), plan.getQueryId()); + hookContext.getHookType(), plan.getQueryId()); } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 450a0b544d6..add4b6863d8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.hooks; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -73,6 +74,7 @@ public void setup() throws Exception { conf = new HiveConf(); conf.set(HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, "llap_queue"); + conf.set(HiveConf.ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, "3"); conf.set(MRJobConfig.QUEUE_NAME, "mr_queue"); conf.set(TezConfiguration.TEZ_QUEUE_NAME, "tez_queue"); tmpFolder = folder.newFolder().getAbsolutePath(); @@ -164,6 +166,26 @@ public void testQueueLogs() throws Exception { Assert.assertEquals(event.getQueue(), "llap_queue"); } + @org.junit.Ignore("might fail intermittently") + @Test + public void testDropsEventWhenQueueIsFull() throws Exception { + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + context.setHookType(HookType.PRE_EXEC_HOOK); + evtLogger.handle(context); + evtLogger.handle(context); + evtLogger.handle(context); + evtLogger.handle(context); + evtLogger.shutdown(); + ProtoMessageReader reader = getTestReader(conf, tmpFolder); + reader.readEvent(); + reader.readEvent(); + reader.readEvent(); + try { + reader.readEvent(); + Assert.fail("Expected 3 events due to queue capacity limit, got 4."); + } catch (EOFException expected) {} + } + @Test public void testPreAndPostEventBoth() throws Exception { context.setHookType(HookType.PRE_EXEC_HOOK);