diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index e1496e9..f1d65ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -165,7 +165,9 @@ private void addResources(String addedFiles) throws IOException { try { URI fileUri = SparkUtilities.getURI(addedFile); if (fileUri != null && !localFiles.contains(fileUri)) { - fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf); + if (SparkUtilities.needUploadToHDFS(fileUri, sparkConf)) { + fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf); + } localFiles.add(fileUri); remoteClient.addFile(fileUri); } @@ -180,7 +182,9 @@ private void addJars(String addedJars) throws IOException { try { URI jarUri = SparkUtilities.getURI(addedJar); if (jarUri != null && !localJars.contains(jarUri)) { - jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf); + if (SparkUtilities.needUploadToHDFS(jarUri, sparkConf)) { + jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf); + } localJars.add(jarUri); remoteClient.addJar(jarUri); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index a93f1f2..91d83f3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; +import org.apache.spark.SparkConf; /** * Contains utilities methods used as part of Spark tasks. @@ -69,7 +70,7 @@ public static URI getURI(String path) throws URISyntaxException { } /** - * Copies local file to HDFS in yarn-cluster mode. + * Uploads a local file to HDFS * * @param source * @param conf @@ -77,18 +78,18 @@ public static URI getURI(String path) throws URISyntaxException { * @throws IOException */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { - URI result = source; - if (conf.get("spark.master").equals("yarn-cluster")) { - if (!source.getScheme().equals("hdfs")) { - Path tmpDir = SessionState.getHDFSSessionPath(conf); - FileSystem fileSystem = FileSystem.get(conf); - fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir); - String filePath = tmpDir + File.separator + getFileName(source); - Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath(); - result = fullPath.toUri(); - } - } - return result; + Path tmpDir = SessionState.getHDFSSessionPath(conf); + FileSystem fileSystem = FileSystem.get(conf); + fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir); + String filePath = tmpDir + File.separator + getFileName(source); + Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath(); + return fullPath.toUri(); + } + + // checks if a resource has to be uploaded to HDFS for yarn-cluster mode + public static boolean needUploadToHDFS(URI source, SparkConf sparkConf) { + return sparkConf.get("spark.master").equals("yarn-cluster") && + !source.getScheme().equals("hdfs"); } private static String getFileName(URI uri) {