Pig
  1. Pig
  2. PIG-3642

Direct HDFS access for small jobs (fetch)

    Details

    • Type: Improvement Improvement
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.13.0
    • Component/s: None
    • Labels:
      None
    • Release Note:
      Hide
      When the DUMP operator is used to execute Pig Latin statements, Pig can take the advantage to minimize latency by directly reading data from HDFS rather than launching MapReduce jobs.

      Direct fetch is turned on by default. To turn it off set the property opt.fetch to false or start Pig with the "-N" or "-no_fetch" option.
      Show
      When the DUMP operator is used to execute Pig Latin statements, Pig can take the advantage to minimize latency by directly reading data from HDFS rather than launching MapReduce jobs. Direct fetch is turned on by default. To turn it off set the property opt.fetch to false or start Pig with the "-N" or "-no_fetch" option.

      Description

      With this patch I'd like to add the possibility to directly read data from HDFS instead of launching MR jobs in case of simple (map-only) tasks. Hive already has this feature (fetch). This patch shares some similarities with the local mode of Pig 0.6. Here, fetching kicks off when the following holds for a script:

      • it contains only LIMIT, FILTER, UNION (if no split is generated), STREAM, (nested) FOREACH with expression operators, custom UDFs..etc
      • no scalar aliases
      • no SampleLoader
      • single leaf job
      • DUMP (no STORE)

      The feature is enabled by default and can be toggled with:

      • -N or -no_fetch
      • set opt.fetch true/false;

      There's no STORE support because I wanted to make it explicit that this "optimization" is for launching small/simple scripts during development, rather than querying and filtering large number of rows on the client machine. However, a threshold could be given on the input size (an estimation) to determine whether to prefer fetch over MR jobs, similar to what Hive's 'hive.fetch.task.conversion.threshold' does. (through Pig's LoadMetadata#getStatistic ?)

      1. PIG-3642.patch
        64 kB
        Lorand Bendig
      2. PIG-3642-3.patch
        72 kB
        Cheolsoo Park
      3. PIG-3642-4.patch
        73 kB
        Lorand Bendig
      4. PIG-3642-5.patch
        80 kB
        Lorand Bendig
      5. PIG-3642-6.patch
        82 kB
        Cheolsoo Park

        Issue Links

          Activity

          Hide
          Cheolsoo Park added a comment -

          Lorand Bendig, is this patch ready for review? If so, do you mind uploading it to the review board?

          This feature will be very useful!

          Show
          Cheolsoo Park added a comment - Lorand Bendig , is this patch ready for review? If so, do you mind uploading it to the review board ? This feature will be very useful!
          Hide
          Lorand Bendig added a comment -

          Cheolsoo Park es, I'd like to have it reviewed

          Show
          Lorand Bendig added a comment - Cheolsoo Park es, I'd like to have it reviewed
          Hide
          Lorand Bendig added a comment -

          Please find attached the review request at : https://reviews.apache.org/r/16507/

          Show
          Lorand Bendig added a comment - Please find attached the review request at : https://reviews.apache.org/r/16507/
          Hide
          Gianmarco De Francisci Morales added a comment -

          I am -0 on this idea.
          Skipping MR requires rewriting good part of the execution logic, and might introduce weird optimization bugs.
          More importantly, the added advantage brought by this feature is small.
          Usually, if you want to test your program on a small input, you copy it locally and run Pig in local mode.

          Show
          Gianmarco De Francisci Morales added a comment - I am -0 on this idea. Skipping MR requires rewriting good part of the execution logic, and might introduce weird optimization bugs. More importantly, the added advantage brought by this feature is small. Usually, if you want to test your program on a small input, you copy it locally and run Pig in local mode.
          Hide
          Lorand Bendig added a comment -

          Gianmarco De Francisci Morales, I took the idea of this patch from HIVE-2925 and PIG-2864. I agree, that the benefit is limited, however simple scripts/queries will run significantly faster than in local MR mode. As far as I can judge, aside from some mocking and initialization
          the execution logic literally follows Pig's pull-based model. What optimization bugs do you think that can happen?

          Show
          Lorand Bendig added a comment - Gianmarco De Francisci Morales , I took the idea of this patch from HIVE-2925 and PIG-2864 . I agree, that the benefit is limited, however simple scripts/queries will run significantly faster than in local MR mode. As far as I can judge, aside from some mocking and initialization the execution logic literally follows Pig's pull-based model. What optimization bugs do you think that can happen?
          Hide
          Gianmarco De Francisci Morales added a comment -

          I haven't reviewed the patch thoroughly so take my comments with the due care.
          I am just afraid that we will redo the same "mistake" we did with the local mode execution of Pig that you mention in the ticket.
          That mode of execution was removed because it was a burden to maintain, and in the end the two implementations (MR and local mode) were out of synch, resulting in the same script doing different things.
          I just want to avoid the same thing happening again.

          If Cheolsoo Park has reviewed the patch, I would like to hear his comments on this issue.

          Show
          Gianmarco De Francisci Morales added a comment - I haven't reviewed the patch thoroughly so take my comments with the due care. I am just afraid that we will redo the same "mistake" we did with the local mode execution of Pig that you mention in the ticket. That mode of execution was removed because it was a burden to maintain, and in the end the two implementations (MR and local mode) were out of synch, resulting in the same script doing different things. I just want to avoid the same thing happening again. If Cheolsoo Park has reviewed the patch, I would like to hear his comments on this issue.
          Hide
          Cheolsoo Park added a comment -

          Gianmarco De Francisci Morales, thank you for raising a concern. But I still think we should commit this patch for the following reasons-

          1. Fetch optimization happens after physical plan is fully built. If the plan is fetchable (i.e. meets all the conditions Lorand listed in the description), Pig will launch a job via FetchLauncher instead via MapReduceLauncher. Given this code path, I think the possibility of introducing a weird optimization bug is minimal. In addition, the optimization is only applicable to fairly small queries.
          2. There are indeed changes to some backend operators such as POStream. This is because the logic about when to pull data from pipeline is different in some cases. But these changes are fairly minimal too.
          3. IMO, the benefit of this optimization is big. I am constantly asked by users about this feature. True that it won't improve any performance of production ETL jobs, but it will shorten development iteration. In addition, launching a full MR job for a simple load/dump query definitely makes a bad impression to new users.
          Show
          Cheolsoo Park added a comment - Gianmarco De Francisci Morales , thank you for raising a concern. But I still think we should commit this patch for the following reasons- Fetch optimization happens after physical plan is fully built. If the plan is fetchable (i.e. meets all the conditions Lorand listed in the description), Pig will launch a job via FetchLauncher instead via MapReduceLauncher. Given this code path, I think the possibility of introducing a weird optimization bug is minimal. In addition, the optimization is only applicable to fairly small queries. There are indeed changes to some backend operators such as POStream. This is because the logic about when to pull data from pipeline is different in some cases. But these changes are fairly minimal too. IMO, the benefit of this optimization is big. I am constantly asked by users about this feature. True that it won't improve any performance of production ETL jobs, but it will shorten development iteration. In addition, launching a full MR job for a simple load/dump query definitely makes a bad impression to new users.
          Hide
          Alan Gates added a comment -

          I don't think this will result in the same local mode/mr mode problem that we had before. The issue there was we tried (and failed) to have two modes where Pig provided all features. This is much more limited to doing things locally that can easily be done locally.

          Show
          Alan Gates added a comment - I don't think this will result in the same local mode/mr mode problem that we had before. The issue there was we tried (and failed) to have two modes where Pig provided all features. This is much more limited to doing things locally that can easily be done locally.
          Hide
          Cheolsoo Park added a comment -

          Uploading the patch that I am running unit/e2e tests.

          This patch adds a "return" to HExecutuinEngine#explain(). (See my comment in the RB.) I also cleaned trailing white spaces.

          Show
          Cheolsoo Park added a comment - Uploading the patch that I am running unit/e2e tests. This patch adds a "return" to HExecutuinEngine#explain(). (See my comment in the RB.) I also cleaned trailing white spaces.
          Hide
          Cheolsoo Park added a comment -

          I see 6 failures in unit tests-

          >>> org.apache.pig.impl.builtin.TestStreamingUDF.testPythonUDF_onCluster
          >>> org.apache.pig.test.TestDefaultDateTimeZone.testLocalExecution
          >>> org.apache.pig.test.TestEvalPipeline2.testNonStandardDataWithoutFetch
          >>> org.apache.pig.test.TestEvalPipeline2.testBinStorageByteArrayCastsSimple
          >>> org.apache.pig.test.TestEvalPipeline2.testLoadWithDifferentSchema
          >>> org.apache.pig.test.TestStoreInstances.testBackendStoreCommunication
          

          Can you please take a look at them?

          Show
          Cheolsoo Park added a comment - I see 6 failures in unit tests- >>> org.apache.pig.impl.builtin.TestStreamingUDF.testPythonUDF_onCluster >>> org.apache.pig.test.TestDefaultDateTimeZone.testLocalExecution >>> org.apache.pig.test.TestEvalPipeline2.testNonStandardDataWithoutFetch >>> org.apache.pig.test.TestEvalPipeline2.testBinStorageByteArrayCastsSimple >>> org.apache.pig.test.TestEvalPipeline2.testLoadWithDifferentSchema >>> org.apache.pig.test.TestStoreInstances.testBackendStoreCommunication Can you please take a look at them?
          Hide
          Lorand Bendig added a comment -

          Cheolsoo Park, thanks for pointing out these issues!

          org.apache.pig.test.TestDefaultDateTimeZone.testLocalExecution

          It's because fetch didn't initialize pig.datetime.default.tz with the current timezone. Fixed.

          org.apache.pig.test.TestEvalPipeline2.testNonStandardDataWithoutFetch
          org.apache.pig.test.TestEvalPipeline2.testBinStorageByteArrayCastsSimple
          org.apache.pig.test.TestEvalPipeline2.testLoadWithDifferentSchema

          This was a non-fetch issue, now fixed in PIG-3662

          org.apache.pig.test.TestStoreInstances.testBackendStoreCommunication

          The problem was here that FetchOptimizer initialized FileLocalizer#relativeRoot to check whether POStore is related to a dump.
          This initialized temporary path is in the threadlocal and it might happen that a Wrong FS: file:/..., expected: hdfs://... exception
          is thrown in those cases if the test is executed both in local and mapreduce mode in the same session. The temp path is initialized
          to file:/ for the local mode and is reused for the mapreduce mode which causes the exception.
          Now relativeRoot is not initialized by FetchOptimizer.

          I managed to run test-core successfully with -Dhadoopversion=23.

          Show
          Lorand Bendig added a comment - Cheolsoo Park , thanks for pointing out these issues! org.apache.pig.test.TestDefaultDateTimeZone.testLocalExecution It's because fetch didn't initialize pig.datetime.default.tz with the current timezone. Fixed. org.apache.pig.test.TestEvalPipeline2.testNonStandardDataWithoutFetch org.apache.pig.test.TestEvalPipeline2.testBinStorageByteArrayCastsSimple org.apache.pig.test.TestEvalPipeline2.testLoadWithDifferentSchema This was a non-fetch issue, now fixed in PIG-3662 org.apache.pig.test.TestStoreInstances.testBackendStoreCommunication The problem was here that FetchOptimizer initialized FileLocalizer#relativeRoot to check whether POStore is related to a dump. This initialized temporary path is in the threadlocal and it might happen that a Wrong FS: file:/ ..., expected: hdfs://... exception is thrown in those cases if the test is executed both in local and mapreduce mode in the same session. The temp path is initialized to file:/ for the local mode and is reused for the mapreduce mode which causes the exception. Now relativeRoot is not initialized by FetchOptimizer. I managed to run test-core successfully with -Dhadoopversion=23.
          Hide
          Cheolsoo Park added a comment -

          Thank you Lorand! I am running unit tests again with your new patch.

          Show
          Cheolsoo Park added a comment - Thank you Lorand! I am running unit tests again with your new patch.
          Hide
          Cheolsoo Park added a comment -

          Unit tests all pass! Thank you so much!

          I am running e2e tests now.

          Show
          Cheolsoo Park added a comment - Unit tests all pass! Thank you so much! I am running e2e tests now.
          Hide
          Aniket Mokashi added a comment -

          We have PIG-3463 that makes small jobs run in hadoop local mode. If that looks good, we can avoid adding complexity of FetchOptimizer.

          Show
          Aniket Mokashi added a comment - We have PIG-3463 that makes small jobs run in hadoop local mode. If that looks good, we can avoid adding complexity of FetchOptimizer.
          Hide
          Cheolsoo Park added a comment -

          Aniket Mokashi, are you saying we should commit PIG-3463 instead of this patch because the former includes the latter?

          Show
          Cheolsoo Park added a comment - Aniket Mokashi , are you saying we should commit PIG-3463 instead of this patch because the former includes the latter?
          Hide
          Lorand Bendig added a comment -

          Aniket Mokashi PIG-3463 is a great add-on! Although the two patches target the same issue, as far as I see, they are not overlapping.
          I think, in this case PIG-3642 can be regarded as a further optimization on top of PIG-3463 when only a subset of simple operators are used and there's even no need of kicking off the LocalJobRunner. Having both add-ons can result in a way faster development cycle, I think. Why do not have both?

          Show
          Lorand Bendig added a comment - Aniket Mokashi PIG-3463 is a great add-on! Although the two patches target the same issue, as far as I see, they are not overlapping. I think, in this case PIG-3642 can be regarded as a further optimization on top of PIG-3463 when only a subset of simple operators are used and there's even no need of kicking off the LocalJobRunner. Having both add-ons can result in a way faster development cycle, I think. Why do not have both?
          Hide
          Aniket Mokashi added a comment -

          Lorand Bendig, I'm not against the adding this feature to pig. However, from feature point of view, PIG-3463 is super set of PIG-3642 with less complexity (pre-tested code etc) and code maintenance. Hence, we should avoid adding this extra complexity and burden of maintenance, if possible.

          Show
          Aniket Mokashi added a comment - Lorand Bendig , I'm not against the adding this feature to pig. However, from feature point of view, PIG-3463 is super set of PIG-3642 with less complexity (pre-tested code etc) and code maintenance. Hence, we should avoid adding this extra complexity and burden of maintenance, if possible.
          Hide
          Cheolsoo Park added a comment -

          I will leave the decision to Aniket and Lorand.

          Just FYI- I have been running e2e tests, and I found many test failures even without this patch. So it was hard to tell whether this patch breaks any tests or not.

          Here is the result in the current trunk (without this patch)-

          [exec] Final results ,    PASSED: 536  FAILED: 22   SKIPPED: 24   ABORTED: 62   FAILED DEPENDENCY: 0
          

          We should fix these before things get worse.

          Show
          Cheolsoo Park added a comment - I will leave the decision to Aniket and Lorand. Just FYI- I have been running e2e tests, and I found many test failures even without this patch. So it was hard to tell whether this patch breaks any tests or not. Here is the result in the current trunk (without this patch)- [exec] Final results , PASSED: 536 FAILED: 22 SKIPPED: 24 ABORTED: 62 FAILED DEPENDENCY: 0 We should fix these before things get worse.
          Hide
          Cheolsoo Park added a comment -

          Actually, discard my e2e results. I found some environment issues and am rerunning them.

          Show
          Cheolsoo Park added a comment - Actually, discard my e2e results. I found some environment issues and am rerunning them.
          Hide
          Lorand Bendig added a comment -

          Cheolsoo, thanks for testing it through.
          However, I don't see the complexity as crucial, and it's also fairly separated from the existing code,
          I see Aniket's point of view. I of course have +1 but obviously you guys on board have the right to make the decision.

          Show
          Lorand Bendig added a comment - Cheolsoo, thanks for testing it through. However, I don't see the complexity as crucial, and it's also fairly separated from the existing code, I see Aniket's point of view. I of course have +1 but obviously you guys on board have the right to make the decision.
          Hide
          Cheolsoo Park added a comment -

          Lorand Bendig, you're right. I was reviewing Aniket's patch today and realized that these two patches are fairly independent of each other.

          Aniket Mokashi, after understanding your patch more, I agree with Lorand regarding the complexity. Besides, it makes mapper only jobs almost instant. I couldn't compare runtime between PIG-3463 and PIG-3642 because the current patch for PIG-3463 didn't work for mapper only jobs. However, I imagine it would be quite slower since it still launches local MR jobs, etc. So shall we commit both?

          In fact, what really concerns me is that these optimizations make many tests run differently than before. For eg, many e2e tests that are running as MR jobs now can run as fetch jobs. That significantly changes our code coverage. So I'd like to explicitly disable these optimizations in all the existing e2e tests. It should be trivial to do via conf files. Do you agree?

          Show
          Cheolsoo Park added a comment - Lorand Bendig , you're right. I was reviewing Aniket's patch today and realized that these two patches are fairly independent of each other. Aniket Mokashi , after understanding your patch more, I agree with Lorand regarding the complexity. Besides, it makes mapper only jobs almost instant. I couldn't compare runtime between PIG-3463 and PIG-3642 because the current patch for PIG-3463 didn't work for mapper only jobs. However, I imagine it would be quite slower since it still launches local MR jobs, etc. So shall we commit both? In fact, what really concerns me is that these optimizations make many tests run differently than before. For eg, many e2e tests that are running as MR jobs now can run as fetch jobs. That significantly changes our code coverage. So I'd like to explicitly disable these optimizations in all the existing e2e tests. It should be trivial to do via conf files. Do you agree?
          Hide
          Cheolsoo Park added a comment -

          Just FYI- e2e tests in trunk pass as of 01/21 except StreamingPythonUDFs_10. I will file a jira for this failure.

          Show
          Cheolsoo Park added a comment - Just FYI- e2e tests in trunk pass as of 01/21 except StreamingPythonUDFs_10. I will file a jira for this failure.
          Hide
          Lorand Bendig added a comment -

          Cheolsoo Park, do you think this patch is a good candidate to have it in trunk? If so, I'd rebase it, had a look at the e2e conf and would run the unit tests again (e.g to see if it really doesn't conflict with PIG-3463).

          Show
          Lorand Bendig added a comment - Cheolsoo Park , do you think this patch is a good candidate to have it in trunk? If so, I'd rebase it, had a look at the e2e conf and would run the unit tests again (e.g to see if it really doesn't conflict with PIG-3463 ).
          Hide
          Cheolsoo Park added a comment -

          Lorand Bendig, yes, I still want to commit this patch. As soon as you rebase your patch, I can re-run the tests.

          For e2e tests, I think you add "opt.fetch=false" to "./test/e2e/pig/conf/testpropertiesfile.conf". (I haven't tested it.)

          Show
          Cheolsoo Park added a comment - Lorand Bendig , yes, I still want to commit this patch. As soon as you rebase your patch, I can re-run the tests. For e2e tests, I think you add "opt.fetch=false" to "./test/e2e/pig/conf/testpropertiesfile.conf". (I haven't tested it.)
          Hide
          Lorand Bendig added a comment -

          Cheolsoo Park, the patch is rebased, minor adjustments were needed.
          Only a few e2e tests are affected, so I turned off fetch at these
          individually through the 'java_params' property. (It seems to
          me that setting opt.fetch=false in testpropertiesfile.conf doesn't work)

          Show
          Lorand Bendig added a comment - Cheolsoo Park , the patch is rebased, minor adjustments were needed. Only a few e2e tests are affected, so I turned off fetch at these individually through the 'java_params' property. (It seems to me that setting opt.fetch=false in testpropertiesfile.conf doesn't work)
          Hide
          Cheolsoo Park added a comment -

          +1.

          All the e2e tests pass except PIG-3679 and PIG-3739. (Both are unrelated.)

          I also confirmed that the fetch optimization is explicitly disabled in all the effected e2e test cases, so they will run in the same manner as before. In fact, I found one "commented-out" test case that can be effected by the fetch optimization. So I made an one-line change just to be safe as follows-

          cmdline.conf#L71
          #			'java_params' => ['-Dopt.fetch=false'],
          

          I will commit PIG-3642-6.patch if I don't hear any objection.

          Show
          Cheolsoo Park added a comment - +1. All the e2e tests pass except PIG-3679 and PIG-3739 . (Both are unrelated.) I also confirmed that the fetch optimization is explicitly disabled in all the effected e2e test cases, so they will run in the same manner as before. In fact, I found one "commented-out" test case that can be effected by the fetch optimization. So I made an one-line change just to be safe as follows- cmdline.conf#L71 # 'java_params' => ['-Dopt.fetch= false '], I will commit PIG-3642 -6.patch if I don't hear any objection.
          Hide
          Cheolsoo Park added a comment -

          Committed to trunk. Thank you Lorand Bendig for the great work!

          As discussed in the RB, I opened a documentation jira and assign it to you, It would be nice if you could add a brief description to the Pig manual about this new feature.

          Show
          Cheolsoo Park added a comment - Committed to trunk. Thank you Lorand Bendig for the great work! As discussed in the RB, I opened a documentation jira and assign it to you, It would be nice if you could add a brief description to the Pig manual about this new feature.
          Hide
          Lorand Bendig added a comment -

          Cheolsoo Park, great! Thank you for your assistance! I'll take care of the docs.

          Show
          Lorand Bendig added a comment - Cheolsoo Park , great! Thank you for your assistance! I'll take care of the docs.
          Hide
          Aniket Mokashi added a comment -

          However, a threshold could be given on the input size (an estimation) to determine whether to prefer fetch over MR jobs, similar to what Hive's 'hive.fetch.task.conversion.threshold' does. (through Pig's LoadMetadata#getStatistic ?)

          Lorand Bendig, we currently do not have such a restriction, correct? Should we add it? (The only case that doesn't require a size restriction is - load-limit-dump).

          Show
          Aniket Mokashi added a comment - However, a threshold could be given on the input size (an estimation) to determine whether to prefer fetch over MR jobs, similar to what Hive's 'hive.fetch.task.conversion.threshold' does. (through Pig's LoadMetadata#getStatistic ?) Lorand Bendig , we currently do not have such a restriction, correct? Should we add it? (The only case that doesn't require a size restriction is - load-limit-dump).
          Hide
          Lorand Bendig added a comment -

          Aniket Mokashi Currently the restriction is that only dump is allowed which implies that users won't query large amount of data.
          If store were supported then 'fetch.task.conversion.threshold' would make sense. My concern is however, that most of the loaders return null for LoadMetadata#getStatistic.

          Show
          Lorand Bendig added a comment - Aniket Mokashi Currently the restriction is that only dump is allowed which implies that users won't query large amount of data. If store were supported then 'fetch.task.conversion.threshold' would make sense. My concern is however, that most of the loaders return null for LoadMetadata#getStatistic.

            People

            • Assignee:
              Lorand Bendig
              Reporter:
              Lorand Bendig
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development