diff --git ql/src/java/org/apache/hadoop/hive/ql/Context.java ql/src/java/org/apache/hadoop/hive/ql/Context.java index 6466275..b809dc8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.TaskRunner; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; @@ -171,10 +172,11 @@ private String getScratchDir(String scheme, String authority, boolean mkdir, String scratchDir) { String fileSystem = scheme + ":" + authority; - String dir = fsScratchDirs.get(fileSystem); + String dir = fsScratchDirs.get(fileSystem + "-" + TaskRunner.getTaskID()); if (dir == null) { - Path dirPath = new Path(scheme, authority, scratchDir); + Path dirPath = new Path(scheme, authority, + scratchDir + "-" + TaskRunner.getTaskID()); if (mkdir) { try { FileSystem fs = dirPath.getFileSystem(conf); @@ -191,7 +193,7 @@ private String getScratchDir(String scheme, String authority, } } dir = dirPath.toString(); - fsScratchDirs.put(fileSystem, dir); + fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskID(), dir); } return dir; @@ -228,9 +230,10 @@ public String getMRScratchDir() { try { Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf); URI uri = dir.toUri(); - return getScratchDir(uri.getScheme(), uri.getAuthority(), + String newScratchDir = getScratchDir(uri.getScheme(), uri.getAuthority(), !explain, uri.getPath()); - + LOG.info("New scratch dir is " + newScratchDir); + return newScratchDir; } catch (IOException e) { throw new RuntimeException(e); } catch (IllegalArgumentException e) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index 56c2be6..4f179b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec; import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hive.ql.session.SessionState; @@ -30,6 +31,13 @@ protected Task tsk; protected TaskResult result; protected SessionState ss; + private static AtomicLong taskCounter = new AtomicLong(0); + private static ThreadLocal taskID = new ThreadLocal() { + @Override + protected Long initialValue() { + return taskCounter.incrementAndGet(); + } + }; public TaskRunner(Task tsk, TaskResult result) { this.tsk = tsk; @@ -44,6 +52,7 @@ public TaskRunner(Task tsk, TaskResult result) { @Override public void run() { SessionState.start(ss); + taskID.set(taskCounter.incrementAndGet()); runSequential(); } @@ -61,4 +70,7 @@ public void runSequential() { result.setExitVal(exitVal); } + public static long getTaskID () { + return taskID.get(); + } }