Index: build-common.xml =================================================================== --- build-common.xml (revision 965024) +++ build-common.xml (working copy) @@ -407,6 +407,7 @@ --> + Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (revision 965024) +++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (working copy) @@ -26,6 +26,7 @@ import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory; import org.apache.hadoop.hive.ql.plan.CollectDesc; @@ -137,48 +138,47 @@ } } + private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) { + Configuration conf = new JobConf(TestOperators.class); + for (String one: taskIds) { + conf.set("mapred.task.id", one); + String attemptId = Utilities.getTaskId(conf); + assertEquals(expectedAttemptId, attemptId); + assertEquals(Utilities.getTaskIdFromFilename(attemptId), expectedTaskId); + assertEquals(Utilities.getTaskIdFromFilename(attemptId + ".gz"), expectedTaskId); + assertEquals(Utilities.getTaskIdFromFilename + (Utilities.toTempPath(new Path(attemptId + ".gz")).toString()), expectedTaskId); + } + } + + /** + * More stuff needs to be added here. Currently it only checks some basic + * file naming libraries + * The old test was deactivated as part of hive-405 + */ public void testFileSinkOperator() throws Throwable { + try { - System.out.println("Testing FileSink Operator"); - // col1 - ExprNodeDesc exprDesc1 = TestExecDriver.getStringColumn("col1"); + testTaskIds (new String [] { + "attempt_200707121733_0003_m_000005_0", + "attempt_local_0001_m_000005_0", + "task_200709221812_0001_m_000005_0", + "task_local_0001_m_000005_0" + }, "000005_0", "000005"); - // col2 - ExprNodeDesc expr1 = TestExecDriver.getStringColumn("col0"); - ExprNodeDesc expr2 = new ExprNodeConstantDesc("1"); - ExprNodeDesc exprDesc2 = TypeCheckProcFactory.DefaultExprProcessor - .getFuncExprNodeDesc("concat", expr1, expr2); + testTaskIds (new String [] { + "job_local_0001_map_000005", + "job_local_0001_reduce_000005", + }, "000005", "000005"); - // select operator to project these two columns - ArrayList earr = new ArrayList(); - earr.add(exprDesc1); - earr.add(exprDesc2); - ArrayList outputCols = new ArrayList(); - for (int i = 0; i < earr.size(); i++) { - outputCols.add("_col" + i); - } - SelectDesc selectCtx = new SelectDesc(earr, outputCols); - Operator op = OperatorFactory.get(SelectDesc.class); - op.setConf(selectCtx); + testTaskIds (new String [] {"1234567"}, + "1234567", "1234567"); - // fileSinkOperator to dump the output of the select - // fileSinkDesc fsd = new fileSinkDesc ("file:///tmp" + File.separator + - // System.getProperty("user.name") + File.separator + - // "TestFileSinkOperator", - // Utilities.defaultTd, false); - // Operator flop = OperatorFactory.getAndMakeChild(fsd, op); + assertEquals(Utilities.getTaskIdFromFilename + ("/mnt/dev005/task_local_0001_m_000005_0"), + "000005"); - op.initialize(new JobConf(TestOperators.class), - new ObjectInspector[] {r[0].oi}); - - // evaluate on row - for (int i = 0; i < 5; i++) { - op.process(r[i].o, 0); - } - op.close(false); - System.out.println("FileSink Operator ok"); - } catch (Throwable e) { e.printStackTrace(); throw e; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 965024) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -278,8 +278,8 @@ if(!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) { // use the default file system of the job - FileSystem fs = FileSystem.get(job); Path planPath = new Path(hiveScratchDir, "plan." + randGen.nextInt()); + FileSystem fs = planPath.getFileSystem(job); FSDataOutputStream out = fs.create(planPath); serializeMapRedWork(w, out); HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toString()); @@ -467,9 +467,16 @@ public static String getTaskId(Configuration hconf) { String taskid = (hconf == null) ? null : hconf.get("mapred.task.id"); if ((taskid == null) || taskid.equals("")) { - return ("" + randGen.nextInt()); + return ("" + Math.abs(randGen.nextInt())); } else { - return taskid.replaceAll("task_[0-9]+_", ""); + /* extract the task and attempt id from the hadoop taskid. + in version 17 the leading component was 'task_'. thereafter + the leading component is 'attempt_'. in 17 - hadoop also + seems to have used _map_ and _reduce_ to denote map/reduce + task types + */ + String ret = taskid.replaceAll(".*_[mr]_", "").replaceAll(".*_(map|reduce)_", ""); + return (ret); } } @@ -958,31 +965,28 @@ /** * The first group will contain the task id. The second group is the optional - * extension. The file name looks like: "24931_r_000000_0" or - * "24931_r_000000_0.gz" + * extension. The file name looks like: "0_0" or "0_0.gz". There may be a leading + * prefix (tmp_). Since getTaskId() can return an integer only - this should match + * a pure integer as well */ - private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$"); + private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*?([0-9]+)(_[0-9])?(\\..*)?$"); /** - * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID. + * Get the task id from the filename. + * It is assumed that the filename is derived from the output of getTaskId + * + * @param filename filename to extract taskid from */ - private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$"); - - /** - * Get the task id from the filename. E.g., get "000000" out of - * "24931_r_000000_0" or "24931_r_000000_0.gz" - */ public static String getTaskIdFromFilename(String filename) { String taskId = filename; - Matcher m = fileNameTaskIdRegex.matcher(filename); + int dirEnd = filename.lastIndexOf(Path.SEPARATOR); + if (dirEnd != -1) + taskId = filename.substring(dirEnd + 1); + + Matcher m = fileNameTaskIdRegex.matcher(taskId); if (!m.matches()) { - Matcher m2 = fileNameLocalTaskIdRegex.matcher(filename); - if (!m2.matches()) { - LOG.warn("Unable to get task id from file name: " + filename - + ". Using full filename as task id."); - } else { - taskId = m2.group(1); - } + LOG.warn("Unable to get task id from file name: " + filename + + ". Using last component" + taskId + " as task id."); } else { taskId = m.group(1); } @@ -991,17 +995,21 @@ } /** - * Replace the task id from the filename. E.g., replace "000000" out of - * "24931_r_000000_0" or "24931_r_000000_0.gz" by 33 to - * "24931_r_000033_0" or "24931_r_000033_0.gz" + * Replace the task id from the filename. + * It is assumed that the filename is derived from the output of getTaskId + * + * @param filename filename to replace taskid + * "0_0" or "0_0.gz" by 33 to + * "33_0" or "33_0.gz" */ public static String replaceTaskIdFromFilename(String filename, int bucketNum) { String taskId = getTaskIdFromFilename(filename); String newTaskId = replaceTaskId(taskId, bucketNum); - return replaceTaskIdFromFilename(filename, taskId, newTaskId); + String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId); + return (ret); } - public static String replaceTaskId(String taskId, int bucketNum) { + private static String replaceTaskId(String taskId, int bucketNum) { String strBucketNum = String.valueOf(bucketNum); int bucketNumLen = strBucketNum.length(); int taskIdLen = taskId.length(); @@ -1020,7 +1028,7 @@ * @param newTaskId * @return */ - public static String replaceTaskIdFromFilename(String filename, + private static String replaceTaskIdFromFilename(String filename, String oldTaskId, String newTaskId) { String[] spl = filename.split(oldTaskId);