commit a2b51f2c2b099830ef3edc99eca649fe84521fc3 Author: Daniel Dai Date: Wed Oct 26 00:11:42 2016 -0700 HIVE-15068: Run ClearDanglingScratchDir periodically inside HS2 diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6f168b5..d97ac6b 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2172,6 +2172,11 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "SSL Versions to disable for all Hive Servers"), // HiveServer2 specific configs + HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR("hive.server2.clear.dangling.scratchdir", false, + "Clear dangling scratch dir periodically in HS2"), + HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL("hive.server2.clear.dangling.scratchdir.interval", + "1800s", new TimeValidator(TimeUnit.SECONDS), + "Interval to clear dangling scratch dir periodically in HS2"), HIVE_SERVER2_SLEEP_INTERVAL_BETWEEN_START_ATTEMPTS("hive.server2.sleep.interval.between.start.attempts", "60s", new TimeValidator(TimeUnit.MILLISECONDS, 0l, true, Long.MAX_VALUE, true), "Amount of time to sleep between HiveServer2 start attempts. Primarily meant for tests"), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java new file mode 100644 index 0000000..081ac96 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/server/TestHS2ClearDanglingScratchDir.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.service.server; + +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.WindowsPathUtil; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.util.Shell; +import org.junit.Assert; +import org.junit.Test; + +public class TestHS2ClearDanglingScratchDir { + @Test + public void testScratchDirCleared() throws Exception { + MiniDFSCluster m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build(); + HiveConf conf = new HiveConf(); + conf.addResource(m_dfs.getConfiguration(0)); + if (Shell.WINDOWS) { + WindowsPathUtil.convertPathsFromWindowsToHdfs(conf); + } + conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true"); + conf.set(HiveConf.ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR.toString(), "true"); + + Path scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + m_dfs.getFileSystem().mkdirs(scratchDir); + m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777")); + + // Fake two live session + SessionState.start(conf); + conf.setVar(HiveConf.ConfVars.HIVESESSIONID, UUID.randomUUID().toString()); + SessionState.start(conf); + + // Fake dead session + Path fakeSessionPath = new Path(new Path(scratchDir, Utils.getUGI().getShortUserName()), + UUID.randomUUID().toString()); + m_dfs.getFileSystem().mkdirs(fakeSessionPath); + m_dfs.getFileSystem().create(new Path(fakeSessionPath, "inuse.lck")).close(); + + FileStatus[] scratchDirs = m_dfs.getFileSystem() + .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName())); + + Assert.assertEquals(scratchDirs.length, 3); + + HiveServer2.scheduleClearDanglingScratchDir(conf, 0); + + // Check dead session get cleared + long start = System.currentTimeMillis(); + long end; + do { + Thread.sleep(200); + end = System.currentTimeMillis(); + if (end - start > 5000) { + Assert.fail("timeout, scratch dir has not been cleared"); + } + scratchDirs = m_dfs.getFileSystem() + .listStatus(new Path(scratchDir, Utils.getUGI().getShortUserName())); + } while (scratchDirs.length != 2); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java index 725f954..2fff92e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.session; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -35,6 +36,8 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A tool to remove dangling scratch directory. A scratch directory could be left behind @@ -51,7 +54,13 @@ * again after 10 min. Once it become writable, cleardanglingscratchDir will be able to * remove it */ -public class ClearDanglingScratchDir { +public class ClearDanglingScratchDir implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(ClearDanglingScratchDir.class); + boolean dryRun = false; + boolean verbose = false; + boolean useConsole = false; + String rootHDFSDir; + HiveConf conf; public static void main(String[] args) throws Exception { try { @@ -82,97 +91,119 @@ public static void main(String[] args) throws Exception { HiveConf conf = new HiveConf(); - Path rootHDFSDirPath; + String rootHDFSDir; if (cli.hasOption("s")) { - rootHDFSDirPath = new Path(cli.getOptionValue("s")); + rootHDFSDir = cli.getOptionValue("s"); } else { - rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR)); + rootHDFSDir = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR); } + ClearDanglingScratchDir clearDanglingScratchDirMain = new ClearDanglingScratchDir(dryRun, + verbose, true, rootHDFSDir, conf); + clearDanglingScratchDirMain.run(); + } - FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf); - FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath); - - List scratchDirToRemove = new ArrayList(); - for (FileStatus userHDFSDir : userHDFSDirList) { - FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath()); - for (FileStatus scratchDir : scratchDirList) { - Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME); - if (!fs.exists(lockFilePath)) { - String message = "Skipping " + scratchDir.getPath() + " since it does not contain " + - SessionState.LOCK_FILE_NAME; - if (verbose) { - SessionState.getConsole().printInfo(message); - } else { - SessionState.getConsole().logInfo(message); + public ClearDanglingScratchDir(boolean dryRun, boolean verbose, boolean useConsole, + String rootHDFSDir, HiveConf conf) { + this.dryRun = dryRun; + this.verbose = verbose; + this.useConsole = useConsole; + this.rootHDFSDir = rootHDFSDir; + this.conf = conf; + } + + @Override + public void run() { + try { + Path rootHDFSDirPath = new Path(rootHDFSDir); + FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf); + FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath); + + List scratchDirToRemove = new ArrayList(); + for (FileStatus userHDFSDir : userHDFSDirList) { + FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath()); + for (FileStatus scratchDir : scratchDirList) { + Path lockFilePath = new Path(scratchDir.getPath(), SessionState.LOCK_FILE_NAME); + if (!fs.exists(lockFilePath)) { + String message = "Skipping " + scratchDir.getPath() + " since it does not contain " + + SessionState.LOCK_FILE_NAME; + if (verbose) { + consoleMessage(message); + } + continue; } - continue; - } - boolean removable = false; - boolean inuse = false; - try { - IOUtils.closeStream(fs.append(lockFilePath)); - removable = true; - } catch (RemoteException eAppend) { - // RemoteException with AlreadyBeingCreatedException will be thrown - // if the file is currently held by a writer - if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){ - inuse = true; - } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) { - // Append is not supported in the cluster, try to use create - try { - IOUtils.closeStream(fs.create(lockFilePath, false)); - } catch (RemoteException eCreate) { - if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){ - // If the file is held by a writer, will throw AlreadyBeingCreatedException - inuse = true; - } else { - SessionState.getConsole().printInfo("Unexpected error:" + eCreate.getMessage()); + boolean removable = false; + boolean inuse = false; + try { + IOUtils.closeStream(fs.append(lockFilePath)); + removable = true; + } catch (RemoteException eAppend) { + // RemoteException with AlreadyBeingCreatedException will be thrown + // if the file is currently held by a writer + if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){ + inuse = true; + } else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) { + // Append is not supported in the cluster, try to use create + try { + IOUtils.closeStream(fs.create(lockFilePath, false)); + } catch (RemoteException eCreate) { + if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){ + // If the file is held by a writer, will throw AlreadyBeingCreatedException + inuse = true; + } else { + consoleMessage("Unexpected error:" + eCreate.getMessage()); + } + } catch (FileAlreadyExistsException eCreateNormal) { + // Otherwise, throw FileAlreadyExistsException, which means the file owner is + // dead + removable = true; } - } catch (FileAlreadyExistsException eCreateNormal) { - // Otherwise, throw FileAlreadyExistsException, which means the file owner is - // dead - removable = true; + } else { + consoleMessage("Unexpected error:" + eAppend.getMessage()); } - } else { - SessionState.getConsole().printInfo("Unexpected error:" + eAppend.getMessage()); } - } - if (inuse) { - // Cannot open the lock file for writing, must be held by a live process - String message = scratchDir.getPath() + " is being used by live process"; - if (verbose) { - SessionState.getConsole().printInfo(message); - } else { - SessionState.getConsole().logInfo(message); + if (inuse) { + // Cannot open the lock file for writing, must be held by a live process + String message = scratchDir.getPath() + " is being used by live process"; + if (verbose) { + consoleMessage(message); + } + } + if (removable) { + scratchDirToRemove.add(scratchDir.getPath()); } - } - if (removable) { - scratchDirToRemove.add(scratchDir.getPath()); } } - } - if (scratchDirToRemove.size()==0) { - SessionState.getConsole().printInfo("Cannot find any scratch directory to clear"); - return; - } - SessionState.getConsole().printInfo("Removing " + scratchDirToRemove.size() + " scratch directories"); - for (Path scratchDir : scratchDirToRemove) { - if (dryRun) { - System.out.println(scratchDir); - } else { - boolean succ = fs.delete(scratchDir, true); - if (!succ) { - SessionState.getConsole().printInfo("Cannot remove " + scratchDir); + if (scratchDirToRemove.size()==0) { + consoleMessage("Cannot find any scratch directory to clear"); + return; + } + consoleMessage("Removing " + scratchDirToRemove.size() + " scratch directories"); + for (Path scratchDir : scratchDirToRemove) { + if (dryRun) { + System.out.println(scratchDir); } else { - String message = scratchDir + " removed"; - if (verbose) { - SessionState.getConsole().printInfo(message); + boolean succ = fs.delete(scratchDir, true); + if (!succ) { + consoleMessage("Cannot remove " + scratchDir); } else { - SessionState.getConsole().logInfo(message); + String message = scratchDir + " removed"; + if (verbose) { + consoleMessage(message); + } } } } + } catch (IOException e) { + consoleMessage("Unexpected exception " + e.getMessage()); + } + } + + private void consoleMessage(String message) { + if (useConsole) { + SessionState.getConsole().printInfo(message); + } else { + LOG.info(message); } } diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 590b1f3..9c94611 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -25,7 +25,10 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.cli.GnuParser; @@ -34,6 +37,7 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; @@ -51,9 +55,9 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager; +import org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir; import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; @@ -81,6 +85,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; /** @@ -538,6 +543,21 @@ public synchronized void stop() { } } + @VisibleForTesting + public static void scheduleClearDanglingScratchDir(HiveConf hiveConf, int initialWaitInSec) { + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR)) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder() + .namingPattern("cleardanglingscratchdir-%d") + .daemon(true) + .build()); + executor.scheduleAtFixedRate(new ClearDanglingScratchDir(false, false, false, + HiveConf.getVar(hiveConf, HiveConf.ConfVars.SCRATCHDIR), hiveConf), initialWaitInSec, + HiveConf.getTimeVar(hiveConf, ConfVars.HIVE_SERVER2_CLEAR_DANGLING_SCRATCH_DIR_INTERVAL, + TimeUnit.SECONDS), TimeUnit.SECONDS); + } + } + private static void startHiveServer2() throws Throwable { long attempts = 0, maxAttempts = 1; while (true) { @@ -558,6 +578,11 @@ private static void startHiveServer2() throws Throwable { // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(hiveConf); + // Schedule task to cleanup dangling scratch dir periodically, + // initial wait for a random time between 0-10 min to + // avoid intial spike when using multiple HS2 + scheduleClearDanglingScratchDir(hiveConf, new Random().nextInt(600)); + server = new HiveServer2(); server.init(hiveConf); server.start();