diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java index f9630cc..31e74ff 100644 --- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java +++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestE2EScenarios.java @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hcatalog.HcatTestUtils; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatContext; @@ -163,13 +164,14 @@ private void copyTable(String in, String out) throws IOException, InterruptedExc System.err.println("Copying from ["+in+"] to ["+out+"] with schema : "+ tableSchema.toString()); oupy.setSchema(ojob, tableSchema); oupy.checkOutputSpecs(ojob); - OutputCommitter oc = oupy.getOutputCommitter(createTaskAttemptContext(ojob.getConfiguration())); + OutputCommitter oc = oupy.getOutputCommitter(createTaskAttemptContext(ojob,TaskType.JOB_SETUP,0)); oc.setupJob(ojob); + int i = 0; for (InputSplit split : inpy.getSplits(ijob)){ - TaskAttemptContext rtaskContext = createTaskAttemptContext(ijob.getConfiguration()); - TaskAttemptContext wtaskContext = createTaskAttemptContext(ojob.getConfiguration()); + TaskAttemptContext rtaskContext = createTaskAttemptContext(ijob,TaskType.MAP,i); + TaskAttemptContext wtaskContext = createTaskAttemptContext(ojob,TaskType.REDUCE,i); RecordReader rr = inpy.createRecordReader(split, rtaskContext); rr.initialize(split, rtaskContext); @@ -184,17 +186,18 @@ private void copyTable(String in, String out) throws IOException, InterruptedExc rw.close(wtaskContext); taskOc.commitTask(wtaskContext); rr.close(); + i++; } oc.commitJob(ojob); } - private TaskAttemptContext createTaskAttemptContext(Configuration tconf) { - Configuration conf = (tconf == null) ? (new Configuration()) : tconf; - TaskAttemptID taskId = new TaskAttemptID(); - conf.setInt("mapred.task.partition", taskId.getId()); - conf.set("mapred.task.id", "attempt__0000_r_000000_" + taskId.getId()); - TaskAttemptContext rtaskContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf , taskId ); + private TaskAttemptContext createTaskAttemptContext(Job j, TaskType ttype, int taskId) { + Configuration conf = (j.getConfiguration() == null) ? (new Configuration()) : j.getConfiguration(); + TaskAttemptID taskAttemptId = new TaskAttemptID("", 0, ttype, taskId, 0 ); + conf.setInt("mapred.task.partition", taskAttemptId.getId()); + conf.set("mapred.task.id", taskAttemptId.toString()); + TaskAttemptContext rtaskContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(conf , taskAttemptId ); return rtaskContext; }