diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 0f3c93d..e1496e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -22,8 +22,8 @@ import java.io.IOException; import java.io.Serializable; -import java.net.MalformedURLException; -import java.net.URL; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -77,8 +77,8 @@ private transient SparkConf sparkConf; private transient HiveConf hiveConf; - private transient List localJars = new ArrayList(); - private transient List localFiles = new ArrayList(); + private transient List localJars = new ArrayList(); + private transient List localFiles = new ArrayList(); private final transient long sparkClientTimtout; @@ -128,7 +128,7 @@ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sp return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } - private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { + private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) throws IOException { // add hive-exec jar addJars((new JobConf(this.getClass())).getJar()); @@ -160,30 +160,32 @@ private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { addResources(addedArchives); } - private void addResources(String addedFiles) { + private void addResources(String addedFiles) throws IOException { for (String addedFile : CSV_SPLITTER.split(Strings.nullToEmpty(addedFiles))) { try { - URL fileUrl = SparkUtilities.getURL(addedFile); - if (fileUrl != null && !localFiles.contains(fileUrl)) { - localFiles.add(fileUrl); - remoteClient.addFile(fileUrl); + URI fileUri = SparkUtilities.getURI(addedFile); + if (fileUri != null && !localFiles.contains(fileUri)) { + fileUri = SparkUtilities.uploadToHDFS(fileUri, hiveConf); + localFiles.add(fileUri); + remoteClient.addFile(fileUri); } - } catch (MalformedURLException e) { - LOG.warn("Failed to add file:" + addedFile); + } catch (URISyntaxException e) { + LOG.warn("Failed to add file:" + addedFile, e); } } } - private void addJars(String addedJars) { + private void addJars(String addedJars) throws IOException { for (String addedJar : CSV_SPLITTER.split(Strings.nullToEmpty(addedJars))) { try { - URL jarUrl = SparkUtilities.getURL(addedJar); - if (jarUrl != null && !localJars.contains(jarUrl)) { - localJars.add(jarUrl); - remoteClient.addJar(jarUrl); + URI jarUri = SparkUtilities.getURI(addedJar); + if (jarUri != null && !localJars.contains(jarUri)) { + jarUri = SparkUtilities.uploadToHDFS(jarUri, hiveConf); + localJars.add(jarUri); + remoteClient.addJar(jarUri); } - } catch (MalformedURLException e) { - LOG.warn("Failed to add jar:" + addedJar); + } catch (URISyntaxException e) { + LOG.warn("Failed to add jar:" + addedJar, e); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 20f542d..a93f1f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hive.ql.exec.spark; import java.io.File; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; @@ -50,25 +54,50 @@ public static BytesWritable copyBytesWritable(BytesWritable bw) { return copy; } - public static URL getURL(String path) throws MalformedURLException { + public static URI getURI(String path) throws URISyntaxException { if (path == null) { return null; } - URL url = null; - try { URI uri = new URI(path); - if (uri.getScheme() != null) { - url = uri.toURL(); - } else { + if (uri.getScheme() == null) { // if no file schema in path, we assume it's file on local fs. - url = new File(path).toURI().toURL(); + uri = new File(path).toURI(); } - } catch (URISyntaxException e) { - // do nothing here, just return null if input path is not a valid URI. + + return uri; + } + + /** + * Copies local file to HDFS in yarn-cluster mode. + * + * @param source + * @param conf + * @return + * @throws IOException + */ + public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { + URI result = source; + if (conf.get("spark.master").equals("yarn-cluster")) { + if (!source.getScheme().equals("hdfs")) { + Path tmpDir = SessionState.getHDFSSessionPath(conf); + FileSystem fileSystem = FileSystem.get(conf); + fileSystem.copyFromLocalFile(new Path(source.getPath()), tmpDir); + String filePath = tmpDir + File.separator + getFileName(source); + Path fullPath = fileSystem.getFileStatus(new Path(filePath)).getPath(); + result = fullPath.toUri(); + } + } + return result; + } + + private static String getFileName(URI uri) { + if (uri == null) { + return null; } - return url; + String[] splits = uri.getPath().split(File.separator); + return splits[splits.length-1]; } public static SparkSession getSparkSession(HiveConf conf, diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java index 36526c1..13c2dbc 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java @@ -18,7 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; -import java.net.URL; +import java.net.URI; import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -68,10 +68,10 @@ * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist * on that node (and not on the client machine). * - * @param url The location of the jar file. + * @param uri The location of the jar file. * @return A future that can be used to monitor the operation. */ - Future addJar(URL url); + Future addJar(URI uri); /** * Adds a file to the running remote context. @@ -80,10 +80,10 @@ * in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist * on that node (and not on the client machine). * - * @param url The location of the file. + * @param uri The location of the file. * @return A future that can be used to monitor the operation. */ - Future addFile(URL url); + Future addFile(URI uri); /** * Get the count of executors. diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 536f85d..30fd400 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -17,6 +17,14 @@ package org.apache.hive.spark.client; +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import io.netty.channel.ChannelHandlerContext; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; @@ -30,14 +38,12 @@ import java.io.OutputStreamWriter; import java.io.Serializable; import java.io.Writer; -import java.net.URL; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,14 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - class SparkClientImpl implements SparkClient { private static final long serialVersionUID = 1L; @@ -153,13 +151,13 @@ public void stop() { } @Override - public Future addJar(URL url) { - return run(new AddJarJob(url.toString())); + public Future addJar(URI uri) { + return run(new AddJarJob(uri.toString())); } @Override - public Future addFile(URL url) { - return run(new AddFileJob(url.toString())); + public Future addFile(URI uri) { + return run(new AddFileJob(uri.toString())); } @Override diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 6e7a09f..d33ad7e 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -22,7 +22,7 @@ import java.io.FileOutputStream; import java.io.InputStream; import java.io.Serializable; -import java.net.URL; +import java.net.URI; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -204,7 +204,7 @@ public void call(SparkClient client) throws Exception { jarFile.closeEntry(); jarFile.close(); - client.addJar(new URL("file:" + jar.getAbsolutePath())) + client.addJar(new URI("file:" + jar.getAbsolutePath())) .get(TIMEOUT, TimeUnit.SECONDS); // Need to run a Spark job to make sure the jar is added to the class loader. Monitoring @@ -220,7 +220,7 @@ public void call(SparkClient client) throws Exception { fileStream.write("test file".getBytes("UTF-8")); fileStream.close(); - client.addJar(new URL("file:" + file.getAbsolutePath())) + client.addJar(new URI("file:" + file.getAbsolutePath())) .get(TIMEOUT, TimeUnit.SECONDS); // The same applies to files added with "addFile". They're only guaranteed to be available