diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java index c887297bc2..d8dd80a1c2 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/exec/spark/TestSparkSessionTimeout.java @@ -68,32 +68,7 @@ public void testMultiSessionSparkSessionTimeout() throws InterruptedException, HiveConf conf = new HiveConf(); conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), - "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); - - SessionState.start(conf); - - runTestSparkSessionTimeout(conf); - return null; - })); - } - for (Future future : futures) { - future.get(); - } - } - - @Test - public void testMultiSparkSessionTimeout() throws ExecutionException, InterruptedException { - List> futures = new ArrayList<>(); - ExecutorService es = Executors.newFixedThreadPool(10); - for (int i = 0; i < 10; i++) { - futures.add(es.submit(() -> { - String confDir = "../../data/conf/spark/local/hive-site.xml"; - HiveConf.setHiveSiteLocation(new File(confDir).toURI().toURL()); - - HiveConf conf = new HiveConf(); - conf.setBoolVar(HiveConf.ConfVars.SPARK_OPTIMIZE_SHUFFLE_SERDE, false); - conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), - "TestSparkSessionTimeout-testMultiSparkSessionTimeout-local-dir").toString()); + "TestSparkSessionTimeout-testMultiSessionSparkSessionTimeout-local-dir").toString()); SessionState.start(conf); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 76a30eb912..10aa94ed00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -255,6 +255,9 @@ public static Random randGen = new Random(); + private static final Object INPUT_SUMMARY_LOCK = new Object(); + private static final Object ROOT_HDFS_DIR_LOCK = new Object(); + /** * ReduceField: * KEY: record key @@ -2317,8 +2320,6 @@ public static void copyJobSecretToTableProperties(TableDesc tbl) throws IOExcept } } - private static final Object INPUT_SUMMARY_LOCK = new Object(); - /** * Returns the maximum number of executors required to get file information from several input locations. * It checks whether HIVE_EXEC_INPUT_LISTING_MAX_THREADS or DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX are > 1 @@ -4463,11 +4464,16 @@ public static boolean isHiveManagedFile(Path path) { public static void ensurePathIsWritable(Path rootHDFSDirPath, HiveConf conf) throws IOException { FsPermission writableHDFSDirPermission = new FsPermission((short)00733); FileSystem fs = rootHDFSDirPath.getFileSystem(conf); + if (!fs.exists(rootHDFSDirPath)) { - Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); + synchronized (ROOT_HDFS_DIR_LOCK) { + if (!fs.exists(rootHDFSDirPath)) { + Utilities.createDirsWithPermission(conf, rootHDFSDirPath, writableHDFSDirPermission, true); + } + } } FsPermission currentHDFSDirPermission = fs.getFileStatus(rootHDFSDirPath).getPermission(); - if (rootHDFSDirPath != null && rootHDFSDirPath.toUri() != null) { + if (rootHDFSDirPath.toUri() != null) { String schema = rootHDFSDirPath.toUri().getScheme(); LOG.debug("HDFS dir: " + rootHDFSDirPath + " with schema " + schema + ", permission: " + currentHDFSDirPermission);