diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 1ae81949ac..03d25e23c8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -280,13 +282,29 @@ private void generateEvent(HookContext hookContext) { } private void writeEvent(HiveHookEventProto event) { - try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { - writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); - } catch (IOException e) { - LOG.error("Error writing proto message for query {}, eventType: {}: ", - event.getHiveQueryId(), event.getEventType(), e); + int MAX_RETRIES = 2; + for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { + try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { + writer.writeProto(event); + // This does not work hence, opening and closing file for every event. + // writer.hflush(); + return; + } catch (IOException e) { + if (retryCount == MAX_RETRIES) { + LOG.error("Error writing proto message for query {}, eventType: {}, retryCount: {}," + + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, + e.getMessage()); + } else { + LOG.error("Error writing proto message for query {}, eventType: {}: ", + event.getHiveQueryId(), event.getEventType(), e); + } + try { + // 0 seconds, for first retry assuming fs object was closed and open will fix it. + Thread.sleep(1000 * retryCount * retryCount); + } catch (InterruptedException e1) { + LOG.warn("Got interrupted while writing event"); + } + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java similarity index 85% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index c9d1b93809..d36442e917 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.IOException; import java.time.LocalDate; @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -43,6 +45,7 @@ * @param The proto message type. */ public class DatePartitionedLogger { + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName()); // Everyone has permission to write, but with sticky set so that delete is restricted. // This is required, since the path is same for all users and everyone writes into it. private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); @@ -51,21 +54,29 @@ private final Path basePath; private final Configuration conf; private final Clock clock; - private final FileSystem fileSystem; public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, Clock clock) throws IOException { this.conf = conf; this.clock = clock; this.parser = parser; - this.fileSystem = baseDir.getFileSystem(conf); - if (!fileSystem.exists(baseDir)) { - fileSystem.mkdirs(baseDir); - fileSystem.setPermission(baseDir, DIR_PERMISSION); - } + FileSystem fileSystem = baseDir.getFileSystem(conf); + createDirIfNotExists(fileSystem, baseDir); this.basePath = fileSystem.resolvePath(baseDir); } + private void createDirIfNotExists(FileSystem fileSystem, Path path) throws IOException { + if (!fileSystem.exists(path)) { + try { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } catch (IOException e) { + // Ignore this exception, if there is a problem it'll fail when trying to read or write. + LOG.warn("Error while trying to set permission: ", e); + } + } + } + /** * Creates a writer for the given fileName, with date as today. */ @@ -86,10 +97,8 @@ public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, */ public Path getPathForDate(LocalDate date, String fileName) throws IOException { Path path = new Path(basePath, getDirForDate(date)); - if (!fileSystem.exists(path)) { - fileSystem.mkdirs(path); - fileSystem.setPermission(path, DIR_PERMISSION); - } + FileSystem fileSystem = path.getFileSystem(conf); + createDirIfNotExists(fileSystem, path); return new Path(path, fileName); } @@ -116,6 +125,7 @@ public String getDirForDate(LocalDate date) { public String getNextDirectory(String currentDir) throws IOException { // Fast check, if the next day directory exists return it. String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + FileSystem fileSystem = basePath.getFileSystem(conf); if (fileSystem.exists(new Path(basePath, nextDate))) { return nextDate; } @@ -138,6 +148,7 @@ public String getNextDirectory(String currentDir) throws IOException { public List scanForChangedFiles(String subDir, Map currentOffsets) throws IOException { Path dirPath = new Path(basePath, subDir); + FileSystem fileSystem = basePath.getFileSystem(conf); List newFiles = new ArrayList<>(); if (!fileSystem.exists(dirPath)) { return newFiles; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java similarity index 97% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java index 1c4296c678..e5f5e6befa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.Closeable; import java.io.IOException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java similarity index 98% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java index 61d844973b..34e47014e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.DataInput; import java.io.DataOutput; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java similarity index 97% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java index ed8de93f36..ca9ba61e02 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.Closeable; import java.io.IOException; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 5e117fe262..98b73e8108 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageReader; import org.junit.Assert; import org.junit.Before; import org.junit.Rule;