diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index d31a2022618507e79c59c5aeed1a4d2cf4fb3729..49b761450cb42fc333baf480af0eece9a8ae1cec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -229,7 +229,7 @@ private SparkJobRef submit(final DriverContext driverContext, final SparkWork sp return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { + private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { // add hive-exec jar addJars((new JobConf(this.getClass())).getJar()); @@ -264,6 +264,7 @@ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IO addResources(addedArchives); } + //This method is not thread safe private void addResources(String addedFiles) throws IOException { for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { try { @@ -281,6 +282,7 @@ private void addResources(String addedFiles) throws IOException { } } + //This method is not thread safe private void addJars(String addedJars) throws IOException { for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index fdc53619893b230c71786c0c517af96f32407277..945eb58c7ed0ec67c3bc6fba09c280536a7b15fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -78,7 +78,8 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) { /** * Uploads a local file to HDFS - * + * This method is not thread safe + * * @param source * @param conf * @return diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java index 6a8b42e92689aa943fc662e4a63e3eb6633116ab..bb501295183b045a4870cfde716c4ccdbb3933b6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java @@ -98,7 +98,6 @@ private final String sessionId; private volatile HiveSparkClient hiveSparkClient; private volatile Path scratchDir; - private final Object dirLock = new Object(); /** * The timestamp of the last completed Spark job. @@ -317,6 +316,7 @@ private boolean matches(String input, String regex, StringBuilder matchedString) return result; } + //This method is not thread safe private void cleanScratchDir() throws IOException { if (scratchDir != null) { FileSystem fs = scratchDir.getFileSystem(conf); @@ -324,15 +324,16 @@ private void cleanScratchDir() throws IOException { scratchDir = null; } } - + /** + * Create scratch directory for spark session if it does not exist. + * This method is not thread safe. + * @return Path to Spark session scratch directory. + * @throws IOException + */ @Override public Path getHDFSSessionDir() throws IOException { if (scratchDir == null) { - synchronized (dirLock) { - if (scratchDir == null) { - scratchDir = createScratchDir(); - } - } + scratchDir = createScratchDir(); } return scratchDir; }