Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 904308) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -23,7 +23,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import org.antlr.runtime.TokenRewriteStream; @@ -48,22 +52,43 @@ private Path[] resDirPaths; private int resDirFilesNum; boolean initialized; + + // Path without a file system + // hive.exec.scratchdir: default: "/tmp/"+System.getProperty("user.name")+"/hive" + // Used for creating temporary path on external file systems private String scratchPath; + // Path on the local file system + // System.getProperty("java.io.tmpdir") + Path.SEPARATOR + // + System.getProperty("user.name") + Path.SEPARATOR + executionId + private Path localScratchDir; + // On the default FileSystem (usually HDFS): + // also based on hive.exec.scratchdir which by default is + // "/tmp/"+System.getProperty("user.name")+"/hive" private Path MRScratchDir; - private Path localScratchDir; - private final ArrayList allScratchDirs = new ArrayList(); + + // allScratchDirs contains all scratch directories including + // localScratchDir and MRScratchDir. + // The external scratch dirs will be also based on hive.exec.scratchdir. + private final Map externalScratchDirs = new HashMap(); + private HiveConf conf; - Random rand = new Random(); - protected int randomid = Math.abs(rand.nextInt()); protected int pathid = 10000; protected boolean explain = false; private TokenRewriteStream tokenRewriteStream; - public Context() { + String executionId; + + public Context(HiveConf conf) throws IOException { + this(conf, generateExecutionId()); } - public Context(HiveConf conf) { + /** + * Create a Context with a given executionId. ExecutionId, together with + * user name and conf, will determine the temporary directory locations. + */ + public Context(HiveConf conf, String executionId) throws IOException { this.conf = conf; + this.executionId = executionId; Path tmpPath = new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR)); scratchPath = tmpPath.toUri().getPath(); } @@ -88,67 +113,65 @@ } /** - * Make a tmp directory on the local filesystem - */ - private void makeLocalScratchDir() throws IOException { - while (true) { - localScratchDir = new Path(System.getProperty("java.io.tmpdir") - + File.separator + Math.abs(rand.nextInt())); - FileSystem fs = FileSystem.getLocal(conf); - if (fs.mkdirs(localScratchDir)) { - localScratchDir = fs.makeQualified(localScratchDir); - allScratchDirs.add(localScratchDir); - break; - } - } - } - - /** * Make a tmp directory for MR intermediate data If URI/Scheme are not * supplied - those implied by the default filesystem will be used (which will * typically correspond to hdfs instance on hadoop cluster) + * + * @param mkdir if true, will make the directory. Will throw IOException if that fails. */ - private void makeMRScratchDir() throws IOException { - while (true) { - MRScratchDir = FileUtils.makeQualified(new Path(conf - .getVar(HiveConf.ConfVars.SCRATCHDIR), Integer.toString(Math.abs(rand - .nextInt()))), conf); - - if (explain) { - allScratchDirs.add(MRScratchDir); - return; + private static Path makeMRScratchDir(HiveConf conf, String executionId, boolean mkdir) + throws IOException { + + Path dir = FileUtils.makeQualified( + new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR), executionId), conf); + + if (mkdir) { + FileSystem fs = dir.getFileSystem(conf); + if (!fs.mkdirs(dir)) { + throw new IOException("Cannot make directory: " + dir); } - - FileSystem fs = MRScratchDir.getFileSystem(conf); - if (fs.mkdirs(MRScratchDir)) { - allScratchDirs.add(MRScratchDir); - return; - } } + return dir; } /** * Make a tmp directory on specified URI Currently will use the same path as * implied by SCRATCHDIR config variable */ - private Path makeExternalScratchDir(URI extURI) throws IOException { - while (true) { - String extPath = scratchPath + File.separator - + Integer.toString(Math.abs(rand.nextInt())); - Path extScratchDir = new Path(extURI.getScheme(), extURI.getAuthority(), - extPath); - - if (explain) { - allScratchDirs.add(extScratchDir); - return extScratchDir; + private static Path makeExternalScratchDir(HiveConf conf, String executionId, + boolean mkdir, URI extURI) throws IOException { + + Path dir = new Path(extURI.getScheme(), extURI.getAuthority(), + conf.getVar(HiveConf.ConfVars.SCRATCHDIR) + Path.SEPARATOR + executionId); + + if (mkdir) { + FileSystem fs = dir.getFileSystem(conf); + if (!fs.mkdirs(dir)) { + throw new IOException("Cannot make directory: " + dir); } - - FileSystem fs = extScratchDir.getFileSystem(conf); - if (fs.mkdirs(extScratchDir)) { - allScratchDirs.add(extScratchDir); - return extScratchDir; + } + return dir; + } + + /** + * Make a tmp directory for local file system. + * + * @param mkdir if true, will make the directory. Will throw IOException if that fails. + */ + private static Path makeLocalScratchDir(HiveConf conf, String executionId, boolean mkdir) + throws IOException { + + FileSystem fs = FileSystem.getLocal(conf); + Path dir = fs.makeQualified(new Path(System.getProperty("java.io.tmpdir") + + Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR + + executionId)); + + if (mkdir) { + if (!fs.mkdirs(dir)) { + throw new IOException("Cannot make directory: " + dir); } } + return dir; } /** @@ -157,15 +180,13 @@ */ private String getExternalScratchDir(URI extURI) { try { - // first check if we already made a scratch dir on this URI - for (Path p : allScratchDirs) { - URI pURI = p.toUri(); - if (strEquals(pURI.getScheme(), extURI.getScheme()) - && strEquals(pURI.getAuthority(), extURI.getAuthority())) { - return p.toString(); - } + String fileSystem = extURI.getScheme() + ":" + extURI.getAuthority(); + Path dir = externalScratchDirs.get(fileSystem); + if (dir == null) { + dir = makeExternalScratchDir(conf, executionId, !explain, extURI); + externalScratchDirs.put(fileSystem, dir); } - return makeExternalScratchDir(extURI).toString(); + return dir.toString(); } catch (IOException e) { throw new RuntimeException(e); } @@ -175,69 +196,71 @@ * Create a map-reduce scratch directory on demand and return it */ private String getMRScratchDir() { - if (MRScratchDir == null) { - try { - makeMRScratchDir(); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - throw new RuntimeException("Error while making MR scratch " - + "directory - check filesystem config (" + e.getCause() + ")", e); + try { + if (MRScratchDir == null) { + MRScratchDir = makeMRScratchDir(conf, executionId, !explain); } + return MRScratchDir.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Error while making MR scratch " + + "directory - check filesystem config (" + e.getCause() + ")", e); } - return MRScratchDir.toString(); } /** * Create a local scratch directory on demand and return it */ private String getLocalScratchDir() { - if (localScratchDir == null) { - try { - makeLocalScratchDir(); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - throw new RuntimeException("Error while making local scratch " - + "directory - check filesystem config (" + e.getCause() + ")", e); + try { + if (localScratchDir == null) { + localScratchDir = makeLocalScratchDir(conf, executionId, true); } + return localScratchDir.toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + throw new RuntimeException("Error while making local scratch " + + "directory - check filesystem config (" + e.getCause() + ")", e); } - return localScratchDir.toString(); } + private void removeDir(Path p) { + try { + p.getFileSystem(conf).delete(p, true); + } catch (Exception e) { + LOG.warn("Error Removing Scratch: " + + StringUtils.stringifyException(e)); + } + } + /** * Remove any created scratch directories */ private void removeScratchDir() { - if (explain) { - try { - if (localScratchDir != null) { - FileSystem.getLocal(conf).delete(localScratchDir, true); - } - } catch (Exception e) { - LOG - .warn("Error Removing Scratch: " - + StringUtils.stringifyException(e)); - } - } else { - for (Path p : allScratchDirs) { - try { - p.getFileSystem(conf).delete(p, true); - } catch (Exception e) { - LOG.warn("Error Removing Scratch: " - + StringUtils.stringifyException(e)); - } - } + + for (Map.Entry p : externalScratchDirs.entrySet()) { + removeDir(p.getValue()); } - MRScratchDir = null; - localScratchDir = null; + externalScratchDirs.clear(); + + if (MRScratchDir != null) { + removeDir(MRScratchDir); + MRScratchDir = null; + } + + if (localScratchDir != null) { + removeDir(localScratchDir); + localScratchDir = null; + } } /** * Return the next available path in the current scratch dir */ private String nextPath(String base) { - return base + File.separator + Integer.toString(pathid++); + return base + Path.SEPARATOR + Integer.toString(pathid++); } /** @@ -425,4 +448,20 @@ public TokenRewriteStream getTokenRewriteStream() { return tokenRewriteStream; } + + /** + * Generate a unique executionId. An executionId, together with user name and + * the configuration, will determine the temporary locations of all intermediate + * files. + * + * In the future, users can use the executionId to resume a query. + */ + public static String generateExecutionId() { + Random rand = new Random(); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS"); + String executionId = "hive_" + format.format(new Date()) + "_" + + Math.abs(rand.nextLong()); + return executionId; + } + }