diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java index 82ec668..020ca08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; import org.apache.spark.SparkFiles; import java.io.BufferedInputStream; @@ -330,6 +332,7 @@ public void process(Object row, int tag) throws HiveException { // initialize the user's process only when you receive the first row if (firstRow) { firstRow = false; + SparkConf sparkConf = null; try { String[] cmdArgs = splitArgs(conf.getScriptCmd()); @@ -342,6 +345,7 @@ public void process(Object row, int tag) throws HiveException { // In spark local mode, we need to search added files in root directory. if (HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + sparkConf = SparkEnv.get().conf(); finder.prependPathComponent(SparkFiles.getRootDirectory()); } File f = finder.getAbsolutePath(prog); @@ -372,6 +376,14 @@ public void process(Object row, int tag) throws HiveException { String idEnvVarVal = getOperatorId(); env.put(safeEnvVarName(idEnvVarName), idEnvVarVal); + // For spark, in non-local mode, any added dependencies are stored at + // SparkFiles::getRootDirectory, which is the executor's working directory. + // In local mode, we need to manually point the process's working directory to it, + // in order to make the dependencies accessible. + if (sparkConf != null && sparkConf.get("spark.master").equals("local")) { + pb.directory(new File(SparkFiles.getRootDirectory())); + } + scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs); DataOutputStream scriptOut = new DataOutputStream( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index cf2c3bc..c377716 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -22,12 +22,11 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; -import java.util.UUID; -import java.util.Collection; import com.google.common.base.Preconditions; import org.apache.commons.io.FilenameUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -91,11 +90,15 @@ public static URI getURI(String path) throws URISyntaxException { */ public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException { Path localFile = new Path(source.getPath()); - // give the uploaded file a UUID - Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), - UUID.randomUUID() + "-" + getFileName(source)); + Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source)); FileSystem fileSystem = FileSystem.get(conf); - fileSystem.copyFromLocalFile(localFile, remoteFile); + if (fileSystem.exists(remoteFile)) { + // If we get here, it means user added two files from different paths but + // with same file name. Since we shouldn't rename the file, let's just + // throw exceptions on this. See HIVE-12229 + throw new FileAlreadyExistsException(remoteFile + " already exists."); + } + fileSystem.copyFromLocalFile(false, false, localFile, remoteFile); Path fullPath = fileSystem.getFileStatus(remoteFile).getPath(); return fullPath.toUri(); }