From 2b0cb07e4f549e64ca776024caf9cf0b5ba01176 Mon Sep 17 00:00:00 2001 From: Anishek Agarwal Date: Fri, 29 Jun 2018 15:05:17 +0530 Subject: [PATCH] HIVE-20011: Move away from append mode in proto logging hook (Harish JP, reviewd by Anishek Agarwal) --- .../hadoop/hive/ql/hooks/HiveProtoLoggingHook.java | 24 +++++++++++++++++++--- .../logging/proto/DatePartitionedLogger.java | 18 +++++++++++----- .../history/logging/proto/ProtoMessageReader.java | 9 +++++--- .../history/logging/proto/ProtoMessageWriter.java | 12 +++++------ 4 files changed, 46 insertions(+), 17 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index bddca1acf8..0820beabf7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -86,6 +86,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.LocalDate; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -100,6 +101,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.compress.utils.IOUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; @@ -180,6 +182,9 @@ private final DatePartitionedLogger logger; private final ExecutorService eventHandler; private final ExecutorService logWriter; + private int logFileCount = 0; + private ProtoMessageWriter writer; + private LocalDate writerDate; EventLogger(HiveConf conf, Clock clock) { this.clock = clock; @@ -233,6 +238,7 @@ void shutdown() { LOG.warn("Got interrupted exception while waiting for events to be flushed", e); } } + IOUtils.closeQuietly(writer); } void handle(HookContext hookContext) { @@ -284,12 +290,24 @@ private void generateEvent(HookContext hookContext) { private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { - try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { + try { + if (writer == null || !logger.getNow().toLocalDate().equals(writerDate)) { + if (writer != null) { + // Day change over case, reset the logFileCount. + logFileCount = 0; + IOUtils.closeQuietly(writer); + } + // increment log file count, if creating a new writer. + writer = logger.getWriter(logFileName + "_" + ++logFileCount); + writerDate = logger.getDateFromDir(writer.getPath().getParent().getName()); + } writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); + writer.hflush(); return; } catch (IOException e) { + // Something wrong with writer, lets close and reopen. + IOUtils.closeQuietly(writer); + writer = null; if (retryCount < MAX_RETRIES) { LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index d6a512179e..58cec7eace 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -45,11 +45,14 @@ * @param The proto message type. */ public class DatePartitionedLogger { - private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName()); + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class); // Everyone has permission to write, but with sticky set so that delete is restricted. // This is required, since the path is same for all users and everyone writes into it. private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + // Since the directories have broad permissions restrict the file read access. + private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066); + private final Parser parser; private final Path basePath; private final Configuration conf; @@ -57,11 +60,12 @@ public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, Clock clock) throws IOException { - this.conf = conf; + this.conf = new Configuration(conf); this.clock = clock; this.parser = parser; createDirIfNotExists(baseDir); this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); + FsPermission.setUMask(this.conf, FILE_UMASK); } private void createDirIfNotExists(Path path) throws IOException { @@ -101,6 +105,10 @@ public Path getPathForDate(LocalDate date, String fileName) throws IOException { return new Path(path, fileName); } + public Path getPathForSubdir(String dirName, String fileName) { + return new Path(new Path(basePath, dirName), fileName); + } + /** * Extract the date from the directory name, this should be a directory created by this class. */ @@ -144,11 +152,11 @@ public String getNextDirectory(String currentDir) throws IOException { * Returns new or changed files in the given directory. The offsets are used to find * changed files. */ - public List scanForChangedFiles(String subDir, Map currentOffsets) + public List scanForChangedFiles(String subDir, Map currentOffsets) throws IOException { Path dirPath = new Path(basePath, subDir); FileSystem fileSystem = basePath.getFileSystem(conf); - List newFiles = new ArrayList<>(); + List newFiles = new ArrayList<>(); if (!fileSystem.exists(dirPath)) { return newFiles; } @@ -157,7 +165,7 @@ public String getNextDirectory(String currentDir) throws IOException { Long offset = currentOffsets.get(fileName); // If the offset was never added or offset < fileSize. if (offset == null || offset < status.getLen()) { - newFiles.add(new Path(dirPath, fileName)); + newFiles.add(status); } } return newFiles; diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java index 5a3c63ad7e..b56f06673e 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -24,19 +24,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.Reader; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageReader implements Closeable { private final Path filePath; - private final SequenceFile.Reader reader; + private final Reader reader; private final ProtoMessageWritable writable; ProtoMessageReader(Configuration conf, Path filePath, Parser parser) throws IOException { this.filePath = filePath; - this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + // The writer does not flush the length during hflush. Using length options lets us read + // past length in the FileStatus but it will throw EOFException during a read instead + // of returning null. + this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE)); this.writable = new ProtoMessageWritable<>(parser); } diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java index c746bb665e..9c086ef0d7 100644 --- a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -26,24 +26,24 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Writer; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; public class ProtoMessageWriter implements Closeable { private final Path filePath; - private final SequenceFile.Writer writer; + private final Writer writer; private final ProtoMessageWritable writable; ProtoMessageWriter(Configuration conf, Path filePath, Parser parser) throws IOException { this.filePath = filePath; this.writer = SequenceFile.createWriter( conf, - SequenceFile.Writer.file(filePath), - SequenceFile.Writer.keyClass(NullWritable.class), - SequenceFile.Writer.valueClass(ProtoMessageWritable.class), - SequenceFile.Writer.appendIfExists(true), - SequenceFile.Writer.compression(CompressionType.RECORD)); + Writer.file(filePath), + Writer.keyClass(NullWritable.class), + Writer.valueClass(ProtoMessageWritable.class), + Writer.compression(CompressionType.RECORD)); this.writable = new ProtoMessageWritable<>(parser); } -- 2.15.1 (Apple Git-101)