Index: common/src/java/org/apache/hadoop/hive/common/FileUtils.java =================================================================== --- common/src/java/org/apache/hadoop/hive/common/FileUtils.java (revision 1350716) +++ common/src/java/org/apache/hadoop/hive/common/FileUtils.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.util.Shell; /** * Collection of file manipulation utilities common across Hive. @@ -246,16 +247,60 @@ * @param outputFile * @throws IOException */ + public static void cab(String parentDir, String[] inputFiles, String outputFile) + throws IOException { + if (!Shell.WINDOWS) { + throw new IOException("Cannot use FileUtils.cab in non-Windows environment."); + } + + String hadoopHome = System.getenv("HADOOP_HOME"); + StringBuffer cabCommand = new StringBuffer(); + cabCommand.append("cd " + parentDir); + cabCommand.append(" & "); + cabCommand.append(hadoopHome + "\\bin\\"); + cabCommand.append("cabarc -r -p n "); + cabCommand.append(" " + outputFile); + for (int i = 0; i < inputFiles.length; i++) { + cabCommand.append(" " + inputFiles[i]); + } + String[] shellCmd = {"cmd", "/c", cabCommand.toString()}; + ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); + shexec.execute(); + int exitcode = shexec.getExitCode(); + if (exitcode != 0) { + throw new IOException("Error cabbing file " + outputFile + + ". Tar process exited with exit code " + exitcode); + } + } + + /** + * Archive all the files in the inputFiles into outputFile + * + * @param inputFiles + * @param outputFile + * @throws IOException + */ public static void tar(String parentDir, String[] inputFiles, String outputFile) throws IOException { StringBuffer tarCommand = new StringBuffer(); - tarCommand.append("cd " + parentDir + " ; "); - tarCommand.append(" tar -zcvf "); + tarCommand.append("cd " + parentDir); + tarCommand.append((Shell.WINDOWS ? " & " : " ; ") + "tar -zcvf "); tarCommand.append(" " + outputFile); for (int i = 0; i < inputFiles.length; i++) { tarCommand.append(" " + inputFiles[i]); } - String[] shellCmd = {"bash", "-c", tarCommand.toString()}; + String shell, arg; + if (Shell.WINDOWS) + { + shell = "cmd.exe"; + arg = "/c"; + } + else + { + shell = "bash"; + arg = "-c"; + } + String[] shellCmd = {shell, arg, tarCommand.toString()}; ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); shexec.execute(); int exitcode = shexec.getExitCode(); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -82,6 +82,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.util.Shell; import org.apache.log4j.Appender; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.FileAppender; @@ -383,16 +384,29 @@ //package and compress all the hashtable files to an archive file String parentDir = localPath.toUri().getPath(); String stageId = this.getId(); - String archiveFileURI = Utilities.generateTarURI(parentDir, stageId); - String archiveFileName = Utilities.generateTarFileName(stageId); - localwork.setStageID(stageId); + String archiveFileURI, archiveFileName; + if (Shell.WINDOWS) { + archiveFileURI = Utilities.generateCabURI(parentDir, stageId); + archiveFileName = Utilities.generateCabFileName(stageId); + localwork.setStageID(stageId); + FileUtils.cab(parentDir, fileNames,archiveFileName); + } else { + archiveFileURI = Utilities.generateTarURI(parentDir, stageId); + archiveFileName = Utilities.generateTarFileName(stageId); + localwork.setStageID(stageId); + FileUtils.tar(parentDir, fileNames,archiveFileName); + } - FileUtils.tar(parentDir, fileNames,archiveFileName); Path archivePath = new Path(archiveFileURI); LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archiveFileURI); //upload archive file to hdfs - String hdfsFile =Utilities.generateTarURI(hdfsPath, stageId); + String hdfsFile; + if (Shell.WINDOWS) { + hdfsFile =Utilities.generateCabURI(hdfsPath, stageId); + } else { + hdfsFile =Utilities.generateTarURI(hdfsPath, stageId); + } Path hdfsFilePath = new Path(hdfsFile); short replication = (short) job.getInt("mapred.submit.replication", 10); hdfs.setReplication(hdfsFilePath, replication); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -44,6 +44,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Shell; /** * Map side Join operator implementation. @@ -171,7 +172,9 @@ } else { Path[] localArchives; String stageID = this.getExecContext().getLocalWork().getStageID(); - String suffix = Utilities.generateTarFileName(stageID); + String suffix = (Shell.WINDOWS ? + Utilities.generateCabFileName(stageID) : + Utilities.generateTarFileName(stageID)); FileSystem localFs = FileSystem.getLocal(hconf); localArchives = DistributedCache.getLocalCacheArchives(this.hconf); Path archive; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -1964,6 +1964,21 @@ return tmpFileURI; } + public static String generateCabURI(String baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".cab"); + return tmpFileURI; + } + + public static String generateCabURI(Path baseURI, String filename) { + String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".cab"); + return tmpFileURI; + } + + public static String generateCabFileName(String name) { + String tmpFileURI = new String(name + ".cab"); + return tmpFileURI; + } + public static String generateTarURI(String baseURI, String filename) { String tmpFileURI = new String(baseURI + Path.SEPARATOR + filename + ".tar.gz"); return tmpFileURI;