Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
in MR, we initialize PigContants.TASK_INDEX in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup
protected void setup(Context context) throws IOException, InterruptedException { ... context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); ... }
But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly.
After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark mode will be same with what in mr.
Now we divide two cases in TestBuiltin#testUniqueID
@Test public void testUniqueID() throws Exception { ... if (!Util.isSparkExecType(cluster.getExecType())) { assertEquals("0-0", iter.next().get(1)); assertEquals("0-1", iter.next().get(1)); assertEquals("0-2", iter.next().get(1)); assertEquals("0-3", iter.next().get(1)); assertEquals("0-4", iter.next().get(1)); assertEquals("1-0", iter.next().get(1)); assertEquals("1-1", iter.next().get(1)); assertEquals("1-2", iter.next().get(1)); assertEquals("1-3", iter.next().get(1)); assertEquals("1-4", iter.next().get(1)); } else { // because we set PigConstants.TASK_INDEX as 0 in // ForEachConverter#ForEachFunction#initializeJobConf // UniqueID.exec() will output like 0-* // the behavior of spark will be same with mr until PIG-5051 is fixed. assertEquals(iter.next().get(1), "0-0"); assertEquals(iter.next().get(1), "0-1"); assertEquals(iter.next().get(1), "0-2"); assertEquals(iter.next().get(1), "0-3"); assertEquals(iter.next().get(1), "0-4"); assertEquals(iter.next().get(1), "0-0"); assertEquals(iter.next().get(1), "0-1"); assertEquals(iter.next().get(1), "0-2"); assertEquals(iter.next().get(1), "0-3"); assertEquals(iter.next().get(1), "0-4"); } ... }