diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 093b4a73f3..0fbe696bf5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -4458,6 +4458,12 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal (long) 10 * 1024 * 1024, "Maximum size in bytes that a single query result is allowed to use in the results cache directory"), + HIVE_QUERY_RESULTS_CACHE_DIRECTORY_MARKER_UPDATE_FREQUENCY( + "hive.query.results.cache.directory.marker.update.frequency", "3600s", + new TimeValidator(TimeUnit.SECONDS), + "How often a marker file should be created in the results cache directory so that the directory " + + "is not deleted by the DirectoryMarkerCleanup"), + HIVE_NOTFICATION_EVENT_POLL_INTERVAL("hive.notification.event.poll.interval", "60s", new TimeValidator(TimeUnit.SECONDS), "How often the notification log is polled for new NotificationEvents from the metastore." + diff --git a/common/src/java/org/apache/hive/common/util/DirectoryMarkerCleanup.java b/common/src/java/org/apache/hive/common/util/DirectoryMarkerCleanup.java new file mode 100644 index 0000000000..4a7f9fd8c6 --- /dev/null +++ b/common/src/java/org/apache/hive/common/util/DirectoryMarkerCleanup.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.common.util; + +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DirectoryMarkerCleanup { + + private static final Logger LOG = LoggerFactory.getLogger(DirectoryMarkerCleanup.class); + + protected ScheduledExecutorService executor; + protected FileSystem fs; + protected Path baseDir; + protected String prefix; + protected long staleThresholdSecs; + protected Set pathsToIgnore; + + /** + * Sets up a periodic check to delete the subdirectories + * @param fs + * @param baseDir + * @param prefix + * @param initialDelaySecs + * @param checkPeriodSecs + * @param staleThresholdSecs + */ + public DirectoryMarkerCleanup(FileSystem fs, Path baseDir, String prefix, + long initialDelaySecs, long checkPeriodSecs, long staleThresholdSecs, Set pathsToIgnore) { + this.fs = fs; + this.baseDir = baseDir; + this.prefix = prefix; + this.staleThresholdSecs = staleThresholdSecs; + this.pathsToIgnore = pathsToIgnore; + + LOG.info("Setting up DirectoryMarkerCleanup at path {}, prefix {}, initialDelay {}, frequency {}, staleThreshold {}, pathsToIgnore {}", + baseDir, prefix, initialDelaySecs, checkPeriodSecs, staleThresholdSecs, pathsToIgnore); + + // Initialize scheduled task + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DirectoryMarkerCleanup %d").build(); + executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + executor.scheduleAtFixedRate( + () -> { this.runCleanup(); }, + initialDelaySecs, checkPeriodSecs, TimeUnit.SECONDS); + } + + public void stop() { + if (executor != null) { + // Stop scheduled task + executor.shutdownNow(); + executor = null; + } + } + + void runCleanup() { + try { + LOG.debug("DirectoryMarkerCleanup for {}", baseDir); + FileStatus[] files = fs.listStatus(baseDir, FileUtils.HIDDEN_FILES_PATH_FILTER); + for (FileStatus file : files) { + if (file.isDirectory()) { + checkDirectory(file); + } + } + } catch (Exception err) { + LOG.error("Error during DirectoryMarkerCleanup", err); + } + } + + void checkDirectory(FileStatus dir) { + try { + if (pathsToIgnore != null && pathsToIgnore.contains(dir.getPath())) { + return; + } + + long lastUpdated = -1; + boolean foundMarkers = false; + // Check the directory for marker files + FileStatus[] files = fs.globStatus(new Path(dir.getPath(), prefix + ".*")); + for (FileStatus file : files) { + long modifyTime = file.getModificationTime(); + foundMarkers = true; + LOG.debug("Marker file {} with modifyTime of {}", file.getPath(), modifyTime); + lastUpdated = Math.max(lastUpdated, modifyTime); + } + + if (!foundMarkers) { + LOG.debug("No marker files found for {} - skipping directory", dir.getPath()); + return; + } + + // currentDate is in millis, staleThreshold is in secs. Adjust accordingly. + long deleteDate = System.currentTimeMillis() - (staleThresholdSecs * 1000); + LOG.debug("Directory {}: lastModifyDate of {}", dir.getPath(), lastUpdated); + if (lastUpdated < deleteDate) { + LOG.info("Deleting directory {}, last modify date of {}", dir.getPath(), lastUpdated); + boolean deleted = fs.delete(dir.getPath(), true); + if (!deleted) { + LOG.error("Directory {} was not deleted by DirectoryMarkerCleanup"); + } + } + } catch (Exception err) { + LOG.error("Error while checking path " + dir.getPath() + " for DirectoryMarkerCleanup", err); + } + } +} diff --git a/common/src/java/org/apache/hive/common/util/DirectoryMarkerUpdate.java b/common/src/java/org/apache/hive/common/util/DirectoryMarkerUpdate.java new file mode 100644 index 0000000000..39ba45dac2 --- /dev/null +++ b/common/src/java/org/apache/hive/common/util/DirectoryMarkerUpdate.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.common.util; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility to mark that a directory is in use by periodically adding a marker file with a known prefix to the directory. + * + */ +public class DirectoryMarkerUpdate { + + private static final Logger LOG = LoggerFactory.getLogger(DirectoryMarkerUpdate.class); + + protected ScheduledExecutorService executor; + protected FileSystem fs; + protected Path dir; + protected String prefix; + protected Queue createdMarkers = new LinkedList(); + + public DirectoryMarkerUpdate(FileSystem fs, Path dir, String prefix, long updateTimeSecs) { + this.fs = fs; + this.dir = dir; + this.prefix = prefix; + + LOG.info("Setting up DirectoryMarkerUpdate at path {}, prefix {}, frequency {}", + dir, prefix, updateTimeSecs); + + // Initialize scheduled task + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DirectoryMarkerUpdate %d").build(); + executor = Executors.newSingleThreadScheduledExecutor(threadFactory); + executor.scheduleAtFixedRate( + () -> { this.updateMarker(); }, + 0, updateTimeSecs, TimeUnit.SECONDS); + } + + public void stop() { + if (executor != null) { + // Stop scheduled task + executor.shutdownNow(); + executor = null; + } + } + + void updateMarker() { + // First add marker for current timestamp + Path marker = new Path(dir, prefix + "." + System.currentTimeMillis()); + try { + boolean created = fs.createNewFile(marker); + if (created) { + createdMarkers.add(marker); + } else { + LOG.error("Unable to create marker file " + marker); + } + } catch (Exception err) { + LOG.error("Error while trying to create marker file " + marker, err); + } + + // If we have created more than one marker file then clean up the older one. + if (createdMarkers.size() > 1) { + Path oldMarker = createdMarkers.remove(); + try { + boolean deleted = fs.delete(oldMarker, false); + if (!deleted) { + LOG.error("Unable to delete old marker file " + oldMarker); + } + } catch (Exception err) { + LOG.error("Error while trying to delete old marker file " + oldMarker, err); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java index 1ca7c11b43..cd405b262e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/cache/results/QueryResultsCache.java @@ -72,6 +72,8 @@ import org.apache.hadoop.hive.ql.parse.TableAccessInfo; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hive.common.util.DirectoryMarkerCleanup; +import org.apache.hive.common.util.DirectoryMarkerUpdate; import org.apache.hive.common.util.TxnIdUtils; import org.slf4j.Logger; @@ -353,7 +355,8 @@ public boolean waitForValidStatus() { private long maxEntrySize; private long maxEntryLifetime; private ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private ScheduledFuture invalidationPollFuture; + private DirectoryMarkerUpdate markerUpdate; + private DirectoryMarkerCleanup markerCleanup; private QueryResultsCache(HiveConf configuration) throws IOException { this.conf = configuration; @@ -375,6 +378,27 @@ private QueryResultsCache(HiveConf configuration) throws IOException { // Results cache directory should be cleaned up at process termination. fs.deleteOnExit(cacheDirPath); + // Setup directory marker update/cleanup + long markerUpdateFreq = conf.getTimeVar( + HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_DIRECTORY_MARKER_UPDATE_FREQUENCY, + TimeUnit.SECONDS); + if (markerUpdateFreq > 0) { + long staleThreshold = 3 * markerUpdateFreq; + String cacheMarkerPrefix = ".cacheupdate"; + + // Set up marker update on current cache dir path so it doesn't get cleaned up by marker cleanup + // while this Hive instance is still running. + markerUpdate = new DirectoryMarkerUpdate(fs, cacheDirPath, ".cacheupdate", markerUpdateFreq); + + // Set up marker cleanup on the root cache directory to cleanup old query cache directories from + // Hive instances which may have terminated abnormally without deleting their cache directories. + // No need to check the current cache directory + Set pathsToIgnore = new HashSet(); + pathsToIgnore.add(fs.makeQualified(cacheDirPath)); // cacheDirPath not qualified, should it be? + markerCleanup = new DirectoryMarkerCleanup(fs, rootCacheDir, cacheMarkerPrefix, + markerUpdateFreq, staleThreshold, staleThreshold, pathsToIgnore); + } + maxCacheSize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_SIZE); maxEntrySize = conf.getLongVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_MAX_ENTRY_SIZE); maxEntryLifetime = conf.getTimeVar( @@ -840,6 +864,17 @@ private boolean clearSpaceForCacheEntry(CacheEntry entry, long size) { return false; } + private void cleanup() { + if (markerUpdate != null) { + markerUpdate.stop(); + markerUpdate = null; + } + if (markerCleanup != null) { + markerCleanup.stop(); + markerCleanup = null; + } + } + private static void addToEntryMap(Map> entryMap, String key, CacheEntry entry) { Set entriesForKey = entryMap.get(key); @@ -871,11 +906,8 @@ public static void cleanupInstance() { // This should only ever be called in testing scenarios. // There should not be any other users of the cache or its entries or this may mess up cleanup. if (inited.get()) { - if (instance.invalidationPollFuture != null) { - instance.invalidationPollFuture.cancel(true); - instance.invalidationPollFuture = null; - } instance.clear(); + instance.cleanup(); instance = null; inited.set(false); }