Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.lang.management.ManagementFactory; @@ -47,6 +48,7 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.LogUtils; @@ -82,6 +84,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.util.Shell; import org.apache.log4j.Appender; import org.apache.log4j.BasicConfigurator; import org.apache.log4j.FileAppender; @@ -96,6 +99,7 @@ public class ExecDriver extends Task implements Serializable, HadoopJobExecHook { private static final long serialVersionUID = 1L; + private static final String JOBCONF_FILENAME = "jobconf.xml"; protected transient JobConf job; public static MemoryMXBean memoryMXBean; @@ -533,7 +537,7 @@ private static void printUsage() { System.err.println("ExecDriver -plan [-jobconf k1=v1 [-jobconf k2=v2] ...] " - + "[-files [,] ...]"); + + "[-jobconffile ] [-files [,] ...]"); System.exit(1); } @@ -569,6 +573,7 @@ public static void main(String[] args) throws IOException, HiveException { String planFileName = null; + String jobConfFileName = null; ArrayList jobConfArgs = new ArrayList(); boolean noLog = false; String files = null; @@ -579,6 +584,8 @@ planFileName = args[++i]; } else if (args[i].equals("-jobconf")) { jobConfArgs.add(args[++i]); + } else if (args[i].equals("-jobconffile")) { + jobConfFileName = args[++i]; } else if (args[i].equals("-nolog")) { noLog = true; } else if (args[i].equals("-files")) { @@ -616,6 +623,10 @@ } } + if (jobConfFileName != null) { + conf.addResource(new Path(jobConfFileName)); + } + if (files != null) { conf.set("tmpfiles", files); } @@ -702,53 +713,25 @@ * Given a Hive Configuration object - generate a command line fragment for passing such * configuration information to ExecDriver. */ - public static String generateCmdLine(HiveConf hconf) { - try { - StringBuilder sb = new StringBuilder(); - Properties deltaP = hconf.getChangedProperties(); - boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local"); - String hadoopSysDir = "mapred.system.dir"; - String hadoopWorkDir = "mapred.local.dir"; + public static String generateCmdLine(HiveConf hconf, Context ctx) + throws IOException { - for (Object one : deltaP.keySet()) { - String oneProp = (String) one; - - if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) { - continue; - } - - String oneValue = deltaP.getProperty(oneProp); - - sb.append("-jobconf "); - sb.append(oneProp); - sb.append("="); - sb.append(URLEncoder.encode(oneValue, "UTF-8")); - sb.append(" "); - } - - // Multiple concurrent local mode job submissions can cause collisions in - // working dirs - // Workaround is to rename map red working dir to a temp dir in such cases - - if (hadoopLocalMode) { - sb.append("-jobconf "); - sb.append(hadoopSysDir); - sb.append("="); - sb.append(URLEncoder.encode(hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt(), - "UTF-8")); - - sb.append(" "); - sb.append("-jobconf "); - sb.append(hadoopWorkDir); - sb.append("="); - sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(), - "UTF-8")); - } - - return sb.toString(); - } catch (UnsupportedEncodingException e) { - throw new RuntimeException(e); + JobConfigCommandLine jobConfCommand = null; + if (Shell.WINDOWS) { + // The maximum length of the DOS command string is 8191 characters + // (in Windows latest versions http://support.microsoft.com/kb/830473). + // This limit will be exceeded easily when it appends individual –hconf + // values to the command string. To work around this problem, Write all + // hconf values to a temp file and pass the temp file path to the child + // jvm to read and initialize the -hconf parameters from file. + Path hConfFilePath = new Path(ctx.getLocalTmpFileURI(), JOBCONF_FILENAME); + jobConfCommand = new JobConfigFileCommandLine(hConfFilePath, + FileSystem.getLocal(hconf)); } + else{ + jobConfCommand = new JobConfigArgumentsCommandLine(); + } + return jobConfCommand.generateCmdLine(hconf); } @Override @@ -991,4 +974,149 @@ public void logPlanProgress(SessionState ss) throws IOException { ss.getHiveHistory().logPlanProgress(queryPlan); } + + + /** + * Abstract class to compose the JobConfig related command line string + */ + static abstract class JobConfigCommandLine { + + /** + * Given a Hive Configuration object - generate a command line fragment for passing such + * configuration information to ExecDriver. + */ + public String generateCmdLine(HiveConf hconf) + throws IOException { + StringBuilder sb = new StringBuilder(); + Properties deltaP = hconf.getChangedProperties(); + boolean hadoopLocalMode = hconf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local"); + String hadoopSysDir = "mapred.system.dir"; + String hadoopWorkDir = "mapred.local.dir"; + + for (Object one : deltaP.keySet()) { + String oneProp = (String) one; + + if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) { + continue; + } + + AddChangedProperty(oneProp, deltaP.getProperty(oneProp)); + } + + // Multiple concurrent local mode job submissions can cause collisions in + // working dirs and system dirs + // Workaround is to rename map red working dir to a temp dir in such cases + if (hadoopLocalMode) { + AddChangedProperty(hadoopSysDir, + hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt()); + + AddChangedProperty(hadoopWorkDir, + hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt()); + } + + return getCommandString(); + } + + /** + * Gets the final command line string for all added properties + */ + protected abstract String getCommandString() throws IOException; + + /** + * Adds the value of the name property to the + * command line. + * + * @param name + * property name. + * @param value + * property value. + */ + protected abstract void AddChangedProperty(String name, String value); + } + + /* + * Composes a command line string by passing all JobConf values as command line + * arguments in the format of "-jobconf k1=v1 -jobconf k2=v2" + */ + static class JobConfigArgumentsCommandLine extends JobConfigCommandLine { + + StringBuilder sb = new StringBuilder(); + + /** + * Gets the final command line string for all added properties + */ + @Override + protected String getCommandString() { + return sb.toString(); + } + + /** + * Adds the value of the name property to the + * command line. + * + * @param name + * property name. + * @param value + * property value. + */ + @Override + protected void AddChangedProperty(String name, String value) { + try { + sb.append("-jobconf "); + sb.append(name); + sb.append("="); + sb.append(URLEncoder.encode(value, "UTF-8")); + sb.append(" "); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + } + + /* + * Writes all JobConf key and values to a temp file and passes the file path as a + * command line argument in the format of "-jobconffile " + */ + static class JobConfigFileCommandLine extends JobConfigCommandLine { + HiveConf conf = new HiveConf(); + Path confFilePath; + LocalFileSystem localFS; + + public JobConfigFileCommandLine(Path f, LocalFileSystem fs) { + confFilePath = f; + localFS = fs; + } + + /** + * Gets the final command line string for all added properties + */ + @Override + protected String getCommandString() + throws IOException { + OutputStream out = null; + try { + out = localFS.create(confFilePath); + conf.writeXml(out); + } finally { + if (out != null) { + out.close(); + } + } + return " -jobconffile " + confFilePath.toString(); + } + + /** + * Adds the value of the name property to the + * command line. + * + * @param name + * property name. + * @param value + * property value. + */ + @Override + protected void AddChangedProperty(String name, String value) { + conf.set(name, value); + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (working copy) @@ -129,8 +129,7 @@ String jarCmd; jarCmd = hiveJar + " " + ExecDriver.class.getName(); - - String hiveConfArgs = ExecDriver.generateCmdLine(conf); + String hiveConfArgs = ExecDriver.generateCmdLine(conf, ctx); String cmdLine = hadoopExec + " jar " + jarCmd + " -localtask -plan " + planPath.toString() + " " + isSilent + " " + hiveConfArgs; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (revision 1350716) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (working copy) @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; - /** * Extension of ExecDriver: * - can optionally spawn a map-reduce task from a separate jvm @@ -165,8 +164,9 @@ libJarsOption = " -libjars " + addedJars + "," + auxJars + " "; } } + // Generate the hiveConfArgs after potentially adding the jars - String hiveConfArgs = generateCmdLine(conf); + String hiveConfArgs = generateCmdLine(conf, ctx); // write out the plan to a local file Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");