diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 29958b3e50..4864aad353 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -637,8 +637,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "", "Base directory into which the proto event messages are written by HiveProtoLoggingHook."), - HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64, - "Queue capacity for the proto events logging threads."), + HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL("hive.hook.proto.rollover-interval", "600s", + new TimeValidator(TimeUnit.SECONDS, 0L, true, 3600 * 24L, true), + "Frequency at which the file rollover check is triggered."), HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d", new TimeValidator(TimeUnit.DAYS), "Frequency at which timer task runs to purge expired proto event files."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 0af30d48f3..030d824ddf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -93,11 +93,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -162,7 +161,6 @@ .collect(Collectors.toSet()); } - private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; private static final int WAIT_TIME = 5; public enum EventType { @@ -182,7 +180,7 @@ private final Clock clock; private final String logFileName; private final DatePartitionedLogger logger; - private final ExecutorService logWriter; + private final ScheduledExecutorService logWriter; private int logFileCount = 0; private ProtoMessageWriter writer; private LocalDate writerDate; @@ -215,13 +213,14 @@ return; } - int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, - HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("Hive Hook Proto Log Writer %d").build(); - logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(queueCapacity), threadFactory); + logWriter = Executors.newSingleThreadScheduledExecutor(threadFactory); + + long rolloverInterval = conf.getTimeVar( + HiveConf.ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL, TimeUnit.MICROSECONDS); + logWriter.scheduleWithFixedDelay(() -> handleTick(), rolloverInterval, rolloverInterval, + TimeUnit.MICROSECONDS); } void shutdown() { @@ -277,29 +276,45 @@ void handle(HookContext hookContext) { } } + private void handleTick() { + try { + maybeRolloverWriterForDay(); + } catch (IOException e) { + LOG.error("Got IOException while trying to rollover: ", e); + } + } + + private boolean maybeRolloverWriterForDay() throws IOException { + if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) { + if (writer != null) { + // Day change over case, reset the logFileCount. + logFileCount = 0; + IOUtils.closeQuietly(writer); + writer = null; + } + // increment log file count, if creating a new writer. + writer = logger.getWriter(logFileName + "_" + ++logFileCount); + writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); + return true; + } + return false; + } + private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { try { - if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) { - if (writer != null) { - // Day change over case, reset the logFileCount. - logFileCount = 0; - IOUtils.closeQuietly(writer); - } - // increment log file count, if creating a new writer. - writer = logger.getWriter(logFileName + "_" + ++logFileCount); - writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); - } - writer.writeProto(event); if (eventPerFile) { - if (writer != null) { - LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath()); - IOUtils.closeQuietly(writer); + LOG.debug("Event per file enabled. Closing proto event file: {}", writer.getPath()); + if (!maybeRolloverWriterForDay()) { + writer = logger.getWriter(logFileName + "_" + ++logFileCount); } - // rollover to next file - writer = logger.getWriter(logFileName + "_" + ++logFileCount); + writer.writeProto(event); + IOUtils.closeQuietly(writer); + writer = null; } else { + maybeRolloverWriterForDay(); + writer.writeProto(event); writer.hflush(); } return; @@ -311,6 +326,7 @@ private void writeEvent(HiveHookEventProto event) { LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, e.getMessage()); + LOG.trace("Exception", e); } else { LOG.error("Error writing proto message for query {}, eventType: {}: ", event.getHiveQueryId(), event.getEventType(), e); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index a5939fa9c0..450a0b544d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -22,11 +22,14 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; @@ -224,6 +227,40 @@ public void testFailureEventLog() throws Exception { assertOtherInfo(event, OtherInfoType.PERF, null); } + @Test + public void testRolloverFiles() throws Exception { + long waitTime = 100; + context.setHookType(HookType.PRE_EXEC_HOOK); + conf.setTimeDuration(ConfVars.HIVE_PROTO_EVENTS_ROLLOVER_CHECK_INTERVAL.varname, waitTime, + TimeUnit.MICROSECONDS); + Path path = new Path(tmpFolder); + FileSystem fs = path.getFileSystem(conf); + AtomicLong time = new AtomicLong(); + EventLogger evtLogger = new EventLogger(conf, () -> time.get()); + evtLogger.handle(context); + int statusLen = 0; + // Loop to ensure that we give some grace for scheduling issues. + for (int i = 0; i < 3; ++i) { + Thread.sleep(waitTime + 100); + statusLen = fs.listStatus(path).length; + if (statusLen > 0) { + break; + } + } + Assert.assertEquals(1, statusLen); + + // Move to next day and ensure a new file gets created. + time.set(24 * 60 * 60 * 1000 + 1000); + for (int i = 0; i < 3; ++i) { + Thread.sleep(waitTime + 100); + statusLen = fs.listStatus(path).length; + if (statusLen > 1) { + break; + } + } + Assert.assertEquals(2, statusLen); + } + private ProtoMessageReader getTestReader(HiveConf conf, String tmpFolder) throws IOException { Path path = new Path(tmpFolder);