commit 3af07cc46b4794727084f6f6801d101b3e00501c Author: Daniel Dai Date: Tue Apr 5 15:44:16 2016 -0700 HIVE-13429: Tool to remove dangling scratch dir diff --git a/bin/ext/cleardanglingscratchdir.cmd b/bin/ext/cleardanglingscratchdir.cmd new file mode 100644 index 0000000..cab2905 --- /dev/null +++ b/bin/ext/cleardanglingscratchdir.cmd @@ -0,0 +1,35 @@ +@echo off +@rem Licensed to the Apache Software Foundation (ASF) under one or more +@rem contributor license agreements. See the NOTICE file distributed with +@rem this work for additional information regarding copyright ownership. +@rem The ASF licenses this file to You under the Apache License, Version 2.0 +@rem (the "License"); you may not use this file except in compliance with +@rem the License. You may obtain a copy of the License at +@rem +@rem http://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. + +set CLASS=org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir +set HIVE_OPTS= +set HADOOP_CLASSPATH= + +pushd %HIVE_LIB% +for /f %%a IN ('dir /b hive-exec-*.jar') do ( + set JAR=%HIVE_LIB%\%%a +) +popd + +if [%1]==[cleardanglingscratchdir_help] goto :cleardanglingscratchdir_help + +:cleardanglingscratchdir + call %HIVE_BIN_PATH%\ext\util\execHiveCmd.cmd %CLASS% +goto :EOF + +:cleardanglingscratchdir_help + call %HIVE_BIN_PATH%\ext\util\execHiveCmd.cmd %CLASS% -h +goto :EOF diff --git a/bin/ext/cleardanglingscratchdir.sh b/bin/ext/cleardanglingscratchdir.sh new file mode 100644 index 0000000..de0e4eb --- /dev/null +++ b/bin/ext/cleardanglingscratchdir.sh @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +THISSERVICE=cleardanglingscratchdir +export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} " + +cleardanglingscratchdir () { + CLASS=org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir + HIVE_OPTS='' + execHiveCmd $CLASS "$@" +} + +cleardanglingscratchdir_help () { + echo "" + echo "usage ./hive --service cleardanglingscratchdir" +} diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b8870f2..3f86a7b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1973,6 +1973,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"), HIVE_START_CLEANUP_SCRATCHDIR("hive.start.cleanup.scratchdir", false, "To cleanup the Hive scratchdir when starting the Hive Server"), + HIVE_SCRATCH_DIR_LOCK("hive.scratchdir.lock", false, + "To hold a lock file in scratchdir to prevent to be removed by cleardanglingscratchdir"), HIVE_INSERT_INTO_MULTILEVEL_DIRS("hive.insert.into.multilevel.dirs", false, "Where to insert into multilevel directories like\n" + "\"insert directory '/HIVEFT25686/chinna/' from table\""), diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java new file mode 100644 index 0000000..1007113 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.session; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.nio.channels.FileChannel; +import java.util.UUID; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +public class TestClearDanglingScratchDir { + private static MiniDFSCluster m_dfs = null; + private static HiveConf conf; + private static Path scratchDir; + private ByteArrayOutputStream stdout; + private PrintStream origStdoutPs; + private static File logFile; + + @BeforeClass + static public void oneTimeSetup() throws Exception { + logFile = File.createTempFile("log", ""); + File log4jConfig = File.createTempFile("config", ".properties"); + log4jConfig.deleteOnExit(); + PrintWriter pw = new PrintWriter(log4jConfig); + pw.println("appenders = console, file"); + pw.println("appender.console.type = Console"); + pw.println("appender.console.name = STDOUT"); + pw.println("appender.console.layout.type = PatternLayout"); + pw.println("appender.console.layout.pattern = %t %-5p %c{2} - %m%n"); + pw.println("appender.file.type = File"); + pw.println("appender.file.name = LOGFILE"); + pw.println("appender.file.fileName = " + logFile.getAbsolutePath()); + pw.println("appender.file.layout.type = PatternLayout"); + pw.println("appender.file.layout.pattern = %t %-5p %c{2} - %m%n"); + pw.println("rootLogger.level = debug"); + pw.println("rootLogger.appenderRefs = stdout"); + pw.println("rootLogger.appenderRef.stdout.ref = STDOUT"); + pw.println("loggers = file"); + pw.println("logger.file.name = SessionState"); + pw.println("logger.file.level = debug"); + pw.println("logger.file.appenderRefs = file"); + pw.println("logger.file.appenderRef.file.ref = LOGFILE"); + pw.close(); + System.setProperty("log4j.configurationFile", log4jConfig.getAbsolutePath()); + + m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build(); + conf = new HiveConf(); + conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true"); + conf.set(HiveConf.ConfVars.METASTORE_AUTO_CREATE_ALL.toString(), "true"); + LoggerFactory.getLogger("SessionState"); + conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, + new Path(System.getProperty("test.tmp.dir"), "warehouse").toString()); + conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + m_dfs.getFileSystem().getUri().toString()); + + scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + m_dfs.getFileSystem().mkdirs(scratchDir); + m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777")); + } + + @AfterClass + static public void shutdown() throws Exception { + m_dfs.shutdown(); + } + + public void redirectOutput() throws IOException { + stdout = new ByteArrayOutputStream(); + PrintStream psStdout = new PrintStream(stdout); + origStdoutPs = System.out; + System.setOut(psStdout); + + FileOutputStream fos = new FileOutputStream(logFile, true); + FileChannel outChan = fos.getChannel(); + outChan.truncate(0); + outChan.close(); + fos.close(); + } + + public void rollbackOutput() { + System.setOut(origStdoutPs); + } + + @Test + public void testClearDanglingScratchDir() throws Exception { + + // No scratch dir initially + redirectOutput(); + ClearDanglingScratchDir.main(new String[]{"-s", + m_dfs.getFileSystem().getUri().toString() + scratchDir.toUri().toString()}); + rollbackOutput(); + Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Cannot find any scratch directory to clear")); + + // Create scratch dir without lock files + m_dfs.getFileSystem().mkdirs(new Path(new Path(scratchDir, "dummy"), UUID.randomUUID().toString())); + redirectOutput(); + ClearDanglingScratchDir.main(new String[]{"-s", + m_dfs.getFileSystem().getUri().toString() + scratchDir.toUri().toString()}); + rollbackOutput(); + Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile), + "since it does not contain " + SessionState.LOCK_FILE_NAME), 1); + Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Cannot find any scratch directory to clear")); + + // One live session + SessionState ss = SessionState.start(conf); + redirectOutput(); + ClearDanglingScratchDir.main(new String[]{"-s", + m_dfs.getFileSystem().getUri().toString() + scratchDir.toUri().toString()}); + rollbackOutput(); + Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile), "is being used by live process"), 1); + + // One dead session with dry-run + ss.releaseSessionLockFile(); + redirectOutput(); + ClearDanglingScratchDir.main(new String[]{"-r", "-s", + m_dfs.getFileSystem().getUri().toString() + scratchDir.toUri().toString()}); + rollbackOutput(); + // Find one session dir to remove + Assert.assertFalse(stdout.toString().isEmpty()); + + // Remove the dead session dir + redirectOutput(); + ClearDanglingScratchDir.main(new String[]{"-s", + m_dfs.getFileSystem().getUri().toString() + scratchDir.toUri().toString()}); + rollbackOutput(); + Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Removing 1 scratch directories")); + Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile), "removed"), 1); + ss.close(); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java new file mode 100644 index 0000000..587d178 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.session; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.RemoteException; + +/** + * A tool to remove dangling scratch directory. A scratch directory could be left behind + * in some cases, such as when vm restarts and leave no chance for Hive to run shutdown hook. + * The tool will test a scratch directory is use, if not, remove it. + * We rely on HDFS write lock for to detect if a scratch directory is in use: + * 1. A HDFS client open HDFS file ($scratchdir/inuse.lck) for write and only close + * it at the time the session is closed + * 2. cleardanglingscratchDir can try to open $scratchdir/inuse.lck for write. If the + * corresponding HiveCli/HiveServer2 is still running, we will get exception. + * Otherwise, we know the session is dead + * 3. If the HiveCli/HiveServer2 dies without closing the HDFS file, NN will reclaim the + * lease after 10 min, ie, the HDFS file hold by the dead HiveCli/HiveServer2 is writable + * again after 10 min. Once it become writable, cleardanglingscratchDir will be able to + * remove it + */ +public class ClearDanglingScratchDir { + + public static void main(String[] args) throws Exception { + Options opts = createOptions(); + CommandLine cli = new GnuParser().parse(opts, args); + + if (cli.hasOption('h')) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("cleardanglingscratchdir" + + " (clear scratch dir left behind by dead HiveCli or HiveServer2)", opts); + return; + } + + boolean dryRun = false; + boolean verbose = false; + + if (cli.hasOption("r")) { + dryRun = true; + } + + if (cli.hasOption("v")) { + verbose = true; + } + + HiveConf conf = new HiveConf(); + + Path rootHDFSDirPath; + if (cli.hasOption("s")) { + rootHDFSDirPath = new Path(cli.getOptionValue("s")); + } else { + rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + } + + FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf); + FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath); + + List scratchDirToRemove = new ArrayList(); + for (FileStatus userHDFSDir : userHDFSDirList) { + FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath()); + for (FileStatus scratchDir : scratchDirList) { + Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME); + if (!fs.exists(lockFilePath)) { + String message = "Skipping " + scratchDir.getPath() + " since it does not contain " + + SessionState.LOCK_FILE_NAME; + if (verbose) { + SessionState.getConsole().printInfo(message); + } else { + SessionState.getConsole().logInfo(message); + } + continue; + } + try { + IOUtils.closeStream(fs.append(lockFilePath)); + scratchDirToRemove.add(scratchDir.getPath()); + } catch (RemoteException e) { + // RemoteException with AlreadyBeingCreatedException will be thrown + // if the file is currently held by a writer + if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){ + // Cannot open the lock file for writing, must be held by a live process + String message = scratchDir.getPath() + " is being used by live process"; + if (verbose) { + SessionState.getConsole().printInfo(message); + } else { + SessionState.getConsole().logInfo(message); + } + } + } + } + } + + if (scratchDirToRemove.size()==0) { + SessionState.getConsole().printInfo("Cannot find any scratch directory to clear"); + return; + } + SessionState.getConsole().printInfo("Removing " + scratchDirToRemove.size() + " scratch directories"); + for (Path scratchDir : scratchDirToRemove) { + if (dryRun) { + System.out.println(scratchDir); + } else { + boolean succ = fs.delete(scratchDir, true); + if (!succ) { + SessionState.getConsole().printInfo("Cannot remove " + scratchDir); + } else { + String message = scratchDir + " removed"; + if (verbose) { + SessionState.getConsole().printInfo(message); + } else { + SessionState.getConsole().logInfo(message); + } + } + } + } + } + + static Options createOptions() { + Options result = new Options(); + + // add -r and --dry-run to generate list only + result.addOption(OptionBuilder + .withLongOpt("dry-run") + .withDescription("Generate a list of dangling scratch dir, printed on console") + .create('r')); + + // add -s and --scratchdir to specify a non-default scratch dir + result.addOption(OptionBuilder + .withLongOpt("scratchdir") + .withDescription("Specify a non-default location of the scratch dir") + .hasArg() + .create('s')); + + // add -v and --verbose to print verbose message + result.addOption(OptionBuilder + .withLongOpt("verbose") + .withDescription("Print verbose message") + .create('v')); + + result.addOption(OptionBuilder + .withLongOpt("help") + .withDescription("print help message") + .create('h')); + + return result; + } +} \ No newline at end of file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 23b8a96..98ecd54 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.URLClassLoader; @@ -46,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; @@ -105,6 +108,7 @@ private static final String LOCAL_SESSION_PATH_KEY = "_hive.local.session.path"; private static final String HDFS_SESSION_PATH_KEY = "_hive.hdfs.session.path"; private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space"; + static final String LOCK_FILE_NAME = "inuse.lck"; private final Map> tempTables = new HashMap>(); private final Map> tempTableColStats = @@ -228,6 +232,8 @@ */ private Path hdfsSessionPath; + private FSDataOutputStream hdfsSessionPathLockFile = null; + /** * sub dir of hdfs session path. used to keep tmp tables * @return Path for temporary tables created by the current session @@ -618,8 +624,9 @@ private static void start(SessionState startSs, boolean isAsync, LogHelper conso * 2. Local scratch dir * 3. Local downloaded resource dir * 4. HDFS session path - * 5. Local session path - * 6. HDFS temp table space + * 5. hold a lock file in HDFS session dir to indicate the it is in use + * 6. Local session path + * 7. HDFS temp table space * @param userName * @throws IOException */ @@ -647,11 +654,19 @@ private void createSessionDirs(String userName) throws IOException { hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId); createPath(conf, hdfsSessionPath, scratchDirPermission, false, true); conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString()); - // 5. Local session path + // 5. hold a lock file in HDFS session dir to indicate the it is in use + if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK)) { + FileSystem fs = FileSystem.get(conf); + hdfsSessionPathLockFile = fs.create(new Path(hdfsSessionPath, LOCK_FILE_NAME), true); + hdfsSessionPathLockFile.writeUTF("hostname: " + InetAddress.getLocalHost().getHostName() + "\n"); + hdfsSessionPathLockFile.writeUTF("process: " + ManagementFactory.getRuntimeMXBean().getName() + "\n"); + hdfsSessionPathLockFile.hsync(); + } + // 6. Local session path localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId); createPath(conf, localSessionPath, scratchDirPermission, true, true); conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString()); - // 6. HDFS temp table space + // 7. HDFS temp table space hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX); createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, true); conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString()); @@ -766,8 +781,18 @@ public Path getTempTableSpace() { return this.hdfsTmpTableSpace; } + @VisibleForTesting + void releaseSessionLockFile() throws IOException { + if (hdfsSessionPath != null && hdfsSessionPathLockFile != null) { + hdfsSessionPathLockFile.close(); + } + } + private void dropSessionPaths(Configuration conf) throws IOException { if (hdfsSessionPath != null) { + if (hdfsSessionPathLockFile != null) { + hdfsSessionPathLockFile.close(); + } hdfsSessionPath.getFileSystem(conf).delete(hdfsSessionPath, true); LOG.info("Deleted HDFS directory: " + hdfsSessionPath); }