diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index 318c2071d3..454c1933e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; import org.antlr.runtime.TokenRewriteStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; @@ -449,10 +450,12 @@ public String getCmd () { * @param mkdir Create the directory if True. * @return A temporary path. */ - private Path getStagingDir(Path inputPath, boolean mkdir) { + @VisibleForTesting + public Path getStagingDir(Path inputPath, boolean mkdir) { final URI inputPathUri = inputPath.toUri(); final String inputPathName = inputPathUri.getPath(); final String fileSystem = inputPathUri.getScheme() + ":" + inputPathUri.getAuthority(); + final String configuredStagingPathScheme = new Path(stagingDir).toUri().getScheme(); final FileSystem fs; try { @@ -463,7 +466,19 @@ private Path getStagingDir(Path inputPath, boolean mkdir) { String stagingPathName; if (inputPathName.indexOf(stagingDir) == -1) { - stagingPathName = new Path(inputPathName, stagingDir).toString(); + Path stagingPath = new Path(inputPathName, stagingDir); + if (configuredStagingPathScheme != null && + !(configuredStagingPathScheme.equalsIgnoreCase(inputPath.toUri().getScheme()))) { + // the configured hive.exec.stagingdir is not on the same FS as the requested inputPath. + // this can happen if the client sets hive.exec.stagingdir to something like s3a://bucket/stagingdir + // but creates a table on HDFS. to avoid failing due to an impossible cross-filesystem rename, + // we instead create this stagingdir using defaults, overriding the customer configuration. + LOG.warn("Cannot use configured staging dir " + stagingPath + " for " + inputPath + + " because they are on separate FileSystems. Using the default: " + + HiveConf.ConfVars.STAGINGDIR.defaultStrVal + " instead."); + stagingPath = new Path(inputPathName, HiveConf.ConfVars.STAGINGDIR.defaultStrVal); + } + stagingPathName = stagingPath.toString(); } else { stagingPathName = inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length()); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java index c9f67579ca..a452fe9d40 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,6 +45,11 @@ public void setUp() throws IOException { context = new Context(conf); } + @After + public void cleanUp() { + conf = new HiveConf(); + } + @Test public void testGetScratchDirectoriesForPaths() throws IOException { Context spyContext = spy(context); @@ -70,4 +76,24 @@ public void testGetScratchDirectoriesForPaths() throws IOException { assertEquals(mrTmpPath, spyContext.getTempDirForInterimJobPath(new Path("file:///user"))); conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_OPTIMIZATIONS_ENABLED.varname, true); } + + @Test + public void testStagingDirOnSameFSDespiteConfiguration() throws IOException { + assertStagingDirOnSameFS(HiveConf.ConfVars.STAGINGDIR.defaultStrVal); + assertStagingDirOnSameFS("file:///tmp/staging"); + assertStagingDirOnSameFS("hdfs://localhost/tmp/staging"); + } + + private void assertStagingDirOnSameFS(String configuredStagingDir) throws IOException { + conf.set(HiveConf.ConfVars.STAGINGDIR.varname, configuredStagingDir); + Context context = new Context(conf); + + Path hdfsPath = new Path("hdfs://localhost/user/tableDir"); + Path hdfsStagingDir = context.getStagingDir(hdfsPath, false); + assertEquals(hdfsPath.toUri().getScheme(), hdfsStagingDir.toUri().getScheme()); + + Path filePath = new Path("file:///user/tableDir"); + Path fileStagingDir = context.getStagingDir(filePath, false); + assertEquals(filePath.toUri().getScheme(), fileStagingDir.toUri().getScheme()); + } }