Uploaded image for project: 'Pig'
  1. Pig
  2. PIG-4856 Optimization for pig on spark
  3. PIG-5051

Initialize PigContants.TASK_INDEX in spark mode correctly

    XMLWordPrintableJSON

Details

    • Sub-task
    • Status: Closed
    • Major
    • Resolution: Fixed
    • None
    • spark-branch
    • spark
    • None

    Description

      in MR, we initialize PigContants.TASK_INDEX in org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce.Reduce#setup

      protected void setup(Context context) throws IOException, InterruptedException {
         ...
          context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
      ...
      }
      

      But spark does not provide funtion like PigGenericMapReduce.Reduce#setup to initialize PigContants.TASK_INDEX when job starts. We need find a solution to initialize PigContants.TASK_INDEX correctly.

      After this jira is fixed. The behavior of TestBuiltin#testUniqueID in spark mode will be same with what in mr.
      Now we divide two cases in TestBuiltin#testUniqueID

       @Test
          public void testUniqueID() throws Exception {
           ...
              if (!Util.isSparkExecType(cluster.getExecType())) {
                  assertEquals("0-0", iter.next().get(1));
                  assertEquals("0-1", iter.next().get(1));
                  assertEquals("0-2", iter.next().get(1));
                  assertEquals("0-3", iter.next().get(1));
                  assertEquals("0-4", iter.next().get(1));
                  assertEquals("1-0", iter.next().get(1));
                  assertEquals("1-1", iter.next().get(1));
                  assertEquals("1-2", iter.next().get(1));
                  assertEquals("1-3", iter.next().get(1));
                  assertEquals("1-4", iter.next().get(1));
              } else {
                  // because we set PigConstants.TASK_INDEX as 0 in
                  // ForEachConverter#ForEachFunction#initializeJobConf
                  // UniqueID.exec() will output like 0-*
                  // the behavior of spark will be same with mr until PIG-5051 is fixed.
                  assertEquals(iter.next().get(1), "0-0");
                  assertEquals(iter.next().get(1), "0-1");
                  assertEquals(iter.next().get(1), "0-2");
                  assertEquals(iter.next().get(1), "0-3");
                  assertEquals(iter.next().get(1), "0-4");
                  assertEquals(iter.next().get(1), "0-0");
                  assertEquals(iter.next().get(1), "0-1");
                  assertEquals(iter.next().get(1), "0-2");
                  assertEquals(iter.next().get(1), "0-3");
                  assertEquals(iter.next().get(1), "0-4");
              }
         ...
      }
      

      Attachments

        1. PIG-4920_6_5051.patch
          27 kB
          liyunzhang
        2. PIG-5051.patch
          4 kB
          liyunzhang

        Activity

          People

            kellyzly liyunzhang
            kellyzly liyunzhang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: