Index: build-common.xml
===================================================================
--- build-common.xml (revision 965024)
+++ build-common.xml (working copy)
@@ -407,6 +407,7 @@
-->
+
Index: ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (revision 965024)
+++ ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (working copy)
@@ -26,6 +26,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.TypeCheckProcFactory;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
@@ -137,48 +138,47 @@
}
}
+ private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) {
+ Configuration conf = new JobConf(TestOperators.class);
+ for (String one: taskIds) {
+ conf.set("mapred.task.id", one);
+ String attemptId = Utilities.getTaskId(conf);
+ assertEquals(expectedAttemptId, attemptId);
+ assertEquals(Utilities.getTaskIdFromFilename(attemptId), expectedTaskId);
+ assertEquals(Utilities.getTaskIdFromFilename(attemptId + ".gz"), expectedTaskId);
+ assertEquals(Utilities.getTaskIdFromFilename
+ (Utilities.toTempPath(new Path(attemptId + ".gz")).toString()), expectedTaskId);
+ }
+ }
+
+ /**
+ * More stuff needs to be added here. Currently it only checks some basic
+ * file naming libraries
+ * The old test was deactivated as part of hive-405
+ */
public void testFileSinkOperator() throws Throwable {
+
try {
- System.out.println("Testing FileSink Operator");
- // col1
- ExprNodeDesc exprDesc1 = TestExecDriver.getStringColumn("col1");
+ testTaskIds (new String [] {
+ "attempt_200707121733_0003_m_000005_0",
+ "attempt_local_0001_m_000005_0",
+ "task_200709221812_0001_m_000005_0",
+ "task_local_0001_m_000005_0"
+ }, "000005_0", "000005");
- // col2
- ExprNodeDesc expr1 = TestExecDriver.getStringColumn("col0");
- ExprNodeDesc expr2 = new ExprNodeConstantDesc("1");
- ExprNodeDesc exprDesc2 = TypeCheckProcFactory.DefaultExprProcessor
- .getFuncExprNodeDesc("concat", expr1, expr2);
+ testTaskIds (new String [] {
+ "job_local_0001_map_000005",
+ "job_local_0001_reduce_000005",
+ }, "000005", "000005");
- // select operator to project these two columns
- ArrayList earr = new ArrayList();
- earr.add(exprDesc1);
- earr.add(exprDesc2);
- ArrayList outputCols = new ArrayList();
- for (int i = 0; i < earr.size(); i++) {
- outputCols.add("_col" + i);
- }
- SelectDesc selectCtx = new SelectDesc(earr, outputCols);
- Operator op = OperatorFactory.get(SelectDesc.class);
- op.setConf(selectCtx);
+ testTaskIds (new String [] {"1234567"},
+ "1234567", "1234567");
- // fileSinkOperator to dump the output of the select
- // fileSinkDesc fsd = new fileSinkDesc ("file:///tmp" + File.separator +
- // System.getProperty("user.name") + File.separator +
- // "TestFileSinkOperator",
- // Utilities.defaultTd, false);
- // Operator flop = OperatorFactory.getAndMakeChild(fsd, op);
+ assertEquals(Utilities.getTaskIdFromFilename
+ ("/mnt/dev005/task_local_0001_m_000005_0"),
+ "000005");
- op.initialize(new JobConf(TestOperators.class),
- new ObjectInspector[] {r[0].oi});
-
- // evaluate on row
- for (int i = 0; i < 5; i++) {
- op.process(r[i].o, 0);
- }
- op.close(false);
-
System.out.println("FileSink Operator ok");
-
} catch (Throwable e) {
e.printStackTrace();
throw e;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 965024)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -278,8 +278,8 @@
if(!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) {
// use the default file system of the job
- FileSystem fs = FileSystem.get(job);
Path planPath = new Path(hiveScratchDir, "plan." + randGen.nextInt());
+ FileSystem fs = planPath.getFileSystem(job);
FSDataOutputStream out = fs.create(planPath);
serializeMapRedWork(w, out);
HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toString());
@@ -467,9 +467,16 @@
public static String getTaskId(Configuration hconf) {
String taskid = (hconf == null) ? null : hconf.get("mapred.task.id");
if ((taskid == null) || taskid.equals("")) {
- return ("" + randGen.nextInt());
+ return ("" + Math.abs(randGen.nextInt()));
} else {
- return taskid.replaceAll("task_[0-9]+_", "");
+ /* extract the task and attempt id from the hadoop taskid.
+ in version 17 the leading component was 'task_'. thereafter
+ the leading component is 'attempt_'. in 17 - hadoop also
+ seems to have used _map_ and _reduce_ to denote map/reduce
+ task types
+ */
+ String ret = taskid.replaceAll(".*_[mr]_", "").replaceAll(".*_(map|reduce)_", "");
+ return (ret);
}
}
@@ -958,31 +965,28 @@
/**
* The first group will contain the task id. The second group is the optional
- * extension. The file name looks like: "24931_r_000000_0" or
- * "24931_r_000000_0.gz"
+ * extension. The file name looks like: "0_0" or "0_0.gz". There may be a leading
+ * prefix (tmp_). Since getTaskId() can return an integer only - this should match
+ * a pure integer as well
*/
- private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
+ private static Pattern fileNameTaskIdRegex = Pattern.compile("^.*?([0-9]+)(_[0-9])?(\\..*)?$");
/**
- * Local job name looks like "job_local_1_map_0000", where 1 is job ID and 0000 is task ID.
+ * Get the task id from the filename.
+ * It is assumed that the filename is derived from the output of getTaskId
+ *
+ * @param filename filename to extract taskid from
*/
- private static Pattern fileNameLocalTaskIdRegex = Pattern.compile(".*local.*_([0-9]*)$");
-
- /**
- * Get the task id from the filename. E.g., get "000000" out of
- * "24931_r_000000_0" or "24931_r_000000_0.gz"
- */
public static String getTaskIdFromFilename(String filename) {
String taskId = filename;
- Matcher m = fileNameTaskIdRegex.matcher(filename);
+ int dirEnd = filename.lastIndexOf(Path.SEPARATOR);
+ if (dirEnd != -1)
+ taskId = filename.substring(dirEnd + 1);
+
+ Matcher m = fileNameTaskIdRegex.matcher(taskId);
if (!m.matches()) {
- Matcher m2 = fileNameLocalTaskIdRegex.matcher(filename);
- if (!m2.matches()) {
- LOG.warn("Unable to get task id from file name: " + filename
- + ". Using full filename as task id.");
- } else {
- taskId = m2.group(1);
- }
+ LOG.warn("Unable to get task id from file name: " + filename
+ + ". Using last component" + taskId + " as task id.");
} else {
taskId = m.group(1);
}
@@ -991,17 +995,21 @@
}
/**
- * Replace the task id from the filename. E.g., replace "000000" out of
- * "24931_r_000000_0" or "24931_r_000000_0.gz" by 33 to
- * "24931_r_000033_0" or "24931_r_000033_0.gz"
+ * Replace the task id from the filename.
+ * It is assumed that the filename is derived from the output of getTaskId
+ *
+ * @param filename filename to replace taskid
+ * "0_0" or "0_0.gz" by 33 to
+ * "33_0" or "33_0.gz"
*/
public static String replaceTaskIdFromFilename(String filename, int bucketNum) {
String taskId = getTaskIdFromFilename(filename);
String newTaskId = replaceTaskId(taskId, bucketNum);
- return replaceTaskIdFromFilename(filename, taskId, newTaskId);
+ String ret = replaceTaskIdFromFilename(filename, taskId, newTaskId);
+ return (ret);
}
- public static String replaceTaskId(String taskId, int bucketNum) {
+ private static String replaceTaskId(String taskId, int bucketNum) {
String strBucketNum = String.valueOf(bucketNum);
int bucketNumLen = strBucketNum.length();
int taskIdLen = taskId.length();
@@ -1020,7 +1028,7 @@
* @param newTaskId
* @return
*/
- public static String replaceTaskIdFromFilename(String filename,
+ private static String replaceTaskIdFromFilename(String filename,
String oldTaskId, String newTaskId) {
String[] spl = filename.split(oldTaskId);