diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 1ae81949ac..eef6ac9462 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,14 +281,30 @@ private void generateEvent(HookContext hookContext) { } } + private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { - try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { - writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); - } catch (IOException e) { - LOG.error("Error writing proto message for query {}, eventType: {}: ", - event.getHiveQueryId(), event.getEventType(), e); + for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { + try (ProtoMessageWriter writer = logger.getWriter(logFileName)) { + writer.writeProto(event); + // This does not work hence, opening and closing file for every event. + // writer.hflush(); + return; + } catch (IOException e) { + if (retryCount < MAX_RETRIES) { + LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, + e.getMessage()); + } else { + LOG.error("Error writing proto message for query {}, eventType: {}: ", + event.getHiveQueryId(), event.getEventType(), e); + } + try { + // 0 seconds, for first retry assuming fs object was closed and open will fix it. + Thread.sleep(1000 * retryCount * retryCount); + } catch (InterruptedException e1) { + LOG.warn("Got interrupted in retry sleep.", e1); + } + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java similarity index 84% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java index c9d1b93809..d6a512179e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.IOException; import java.time.LocalDate; @@ -33,6 +33,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; @@ -43,6 +45,7 @@ * @param The proto message type. */ public class DatePartitionedLogger { + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName()); // Everyone has permission to write, but with sticky set so that delete is restricted. // This is required, since the path is same for all users and everyone writes into it. private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); @@ -51,19 +54,27 @@ private final Path basePath; private final Configuration conf; private final Clock clock; - private final FileSystem fileSystem; public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, Clock clock) throws IOException { this.conf = conf; this.clock = clock; this.parser = parser; - this.fileSystem = baseDir.getFileSystem(conf); - if (!fileSystem.exists(baseDir)) { - fileSystem.mkdirs(baseDir); - fileSystem.setPermission(baseDir, DIR_PERMISSION); + createDirIfNotExists(baseDir); + this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); + } + + private void createDirIfNotExists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + try { + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } + } catch (IOException e) { + // Ignore this exception, if there is a problem it'll fail when trying to read or write. + LOG.warn("Error while trying to set permission: ", e); } - this.basePath = fileSystem.resolvePath(baseDir); } /** @@ -86,10 +97,7 @@ public DatePartitionedLogger(Parser parser, Path baseDir, Configuration conf, */ public Path getPathForDate(LocalDate date, String fileName) throws IOException { Path path = new Path(basePath, getDirForDate(date)); - if (!fileSystem.exists(path)) { - fileSystem.mkdirs(path); - fileSystem.setPermission(path, DIR_PERMISSION); - } + createDirIfNotExists(path); return new Path(path, fileName); } @@ -116,6 +124,7 @@ public String getDirForDate(LocalDate date) { public String getNextDirectory(String currentDir) throws IOException { // Fast check, if the next day directory exists return it. String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + FileSystem fileSystem = basePath.getFileSystem(conf); if (fileSystem.exists(new Path(basePath, nextDate))) { return nextDate; } @@ -138,6 +147,7 @@ public String getNextDirectory(String currentDir) throws IOException { public List scanForChangedFiles(String subDir, Map currentOffsets) throws IOException { Path dirPath = new Path(basePath, subDir); + FileSystem fileSystem = basePath.getFileSystem(conf); List newFiles = new ArrayList<>(); if (!fileSystem.exists(dirPath)) { return newFiles; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java similarity index 94% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java index 1c4296c678..5a3c63ad7e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.Closeable; import java.io.IOException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java similarity index 96% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java index 61d844973b..7a08e202d3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.DataInput; import java.io.DataOutput; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java similarity index 95% rename from ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java rename to ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java index ed8de93f36..c746bb665e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.hooks; +package org.apache.tez.dag.history.logging.proto; import java.io.Closeable; import java.io.IOException; diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java new file mode 100644 index 0000000000..23ed46062e --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Logger code copied from tez codebase, this should be removed when we swtich + * to 0.9.2 tez version and we should depend on the tez libraries for this. + */ +package org.apache.tez.dag.history.logging.proto; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 5e117fe262..98b73e8108 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageReader; import org.junit.Assert; import org.junit.Before; import org.junit.Rule;