Hive
  1. Hive
  2. HIVE-1408

add option to let hive automatically run in local mode based on tunable heuristics

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: Query Processor
    • Labels:
      None

      Description

      as a followup to HIVE-543 - we should have a simple option (enabled by default) to let hive run in local mode if possible.

      two levels of options are desirable:
      1. hive.exec.mode.local.auto=true/false // control whether local mode is automatically chosen
      2. Options to control different heuristics, some naiive examples:
      hive.exec.mode.local.auto.input.size.max=1G // don't choose local mode if data > 1G
      hive.exec.mode.local.auto.script.enable=true/false // choose if local mode is enabled for queries with user scripts

      this can be implemented as a pre/post execution hook. It makes sense to provide this as a standard hook in the hive codebase since it's likely to improve response time for many users (especially for test queries).

      the initial proposal is to choose this at a query level and not at per hive-task (ie. hadoop job) level. per job-level requires more changes to compilation (to not pre-commit to hdfs or local scratch directories at compile time).

      1. hive-1408.6.patch
        1.09 MB
        Joydeep Sen Sarma
      2. 1408.7.patch
        1.09 MB
        Joydeep Sen Sarma
      3. 1408.2.q.out.patch
        113 kB
        Joydeep Sen Sarma
      4. 1408.2.patch
        1.47 MB
        Joydeep Sen Sarma
      5. 1408.1.patch
        114 kB
        Joydeep Sen Sarma

        Issue Links

          Activity

          Hide
          Joydeep Sen Sarma added a comment -

          this is somewhat more complicated than i had bargained for:

          • we choose local/hdfs files at query compile time based on local mode setting. however we won't choose local mode until query compilation is complete
          • we choose whether to submit job via child jvm before the point at which the pre-hook is called. we (currently) have to submit job via child jvm for local mode
          • hooks don't have access to map-reduce plans and whether there are any script operators (for instance).

          so it's not possible to implement this via hooks. (and the changes required are somewhat invasive)

          Show
          Joydeep Sen Sarma added a comment - this is somewhat more complicated than i had bargained for: we choose local/hdfs files at query compile time based on local mode setting. however we won't choose local mode until query compilation is complete we choose whether to submit job via child jvm before the point at which the pre-hook is called. we (currently) have to submit job via child jvm for local mode hooks don't have access to map-reduce plans and whether there are any script operators (for instance). so it's not possible to implement this via hooks. (and the changes required are somewhat invasive)
          Hide
          Joydeep Sen Sarma added a comment -

          v1 - i will update with tests.

          couple of main objectives:
          1. decide whether each mr job can be run locally
          2. decide whether local disk can be used for intermediate data (if all jobs are going to run locally)

          right now - both #1 and #2 are code complete - but only #1 has been enabled in the code (#2 needs more testing)

          the general strategy is:

          • after compilation/optimization - look at input size of each mr job.
          • if all the jobs are small - then we can use local disk for intermediate data (#2)
          • else - we use hdfs for intermediate input and before launching each job - we (re)test whether the input data set is such that we can execute locally.

          had to do substantial restructuring to make this happen:
          a. MapRedTask is now a wrapper around ExecDriver. This allows us to have a single task implementation for running mr jobs. mapredtask decides at execute time whether it should run locally or not.
          b. Context.java is pretty much rewritten - the path management code was somewhat buggy (in particular isMRTmpFileURI was incorrect). the code was rewritten to allow make it easy to swizzle tmp paths to be directed to local disk after plan generation
          c. added a small cache for caching DFS file metadata (sizes). this is because we lookup file metadata many times over now (for determining local mode as well as for estimating reducer count) and this cuts the overhead of repeated DFS rpcs
          d. most test output changes are because of altered temporary path naming convention due to (b)
          e. bug fixes: CTAS and RCFileOutputFormat were broken for local mode execution. some cleanup (debug log statements should be wrapped in ifDebugEnabled()).

          Show
          Joydeep Sen Sarma added a comment - v1 - i will update with tests. couple of main objectives: 1. decide whether each mr job can be run locally 2. decide whether local disk can be used for intermediate data (if all jobs are going to run locally) right now - both #1 and #2 are code complete - but only #1 has been enabled in the code (#2 needs more testing) the general strategy is: after compilation/optimization - look at input size of each mr job. if all the jobs are small - then we can use local disk for intermediate data (#2) else - we use hdfs for intermediate input and before launching each job - we (re)test whether the input data set is such that we can execute locally. had to do substantial restructuring to make this happen: a. MapRedTask is now a wrapper around ExecDriver. This allows us to have a single task implementation for running mr jobs. mapredtask decides at execute time whether it should run locally or not. b. Context.java is pretty much rewritten - the path management code was somewhat buggy (in particular isMRTmpFileURI was incorrect). the code was rewritten to allow make it easy to swizzle tmp paths to be directed to local disk after plan generation c. added a small cache for caching DFS file metadata (sizes). this is because we lookup file metadata many times over now (for determining local mode as well as for estimating reducer count) and this cuts the overhead of repeated DFS rpcs d. most test output changes are because of altered temporary path naming convention due to (b) e. bug fixes: CTAS and RCFileOutputFormat were broken for local mode execution. some cleanup (debug log statements should be wrapped in ifDebugEnabled()).
          Hide
          Joydeep Sen Sarma added a comment -

          v2. this is ready for review.

          added tests:

          • the tests now use 'pfile:///' namespace as the default warehouse filesystem. This is served by a proxy filesystem class that passes requests to the local file system
          • this comprehensively tests all the file system issues related to running in local mode (where there is now a difference between the intermediate data's file system and the warehouse's file system). there are several small bug fixes related to bugs discovered because of this test mode.
          • there are changes in a lot of test results as a result of the new namespace as well as because of the changes in tmp file naming. i am attaching a extra diff (.q.out.patch) that shows only the interesting changes.
          • some tests have been modified to run with a non-local setting for the jobtracker and with auto-local-mode turned on. this tests the new functionality.
          • there is one test (archive.q) that's still breaking because of the filesystem issues. waiting for a fix from pyang. but it should not stop the review.
          Show
          Joydeep Sen Sarma added a comment - v2. this is ready for review. added tests: the tests now use 'pfile:///' namespace as the default warehouse filesystem. This is served by a proxy filesystem class that passes requests to the local file system this comprehensively tests all the file system issues related to running in local mode (where there is now a difference between the intermediate data's file system and the warehouse's file system). there are several small bug fixes related to bugs discovered because of this test mode. there are changes in a lot of test results as a result of the new namespace as well as because of the changes in tmp file naming. i am attaching a extra diff (.q.out.patch) that shows only the interesting changes. some tests have been modified to run with a non-local setting for the jobtracker and with auto-local-mode turned on. this tests the new functionality. there is one test (archive.q) that's still breaking because of the filesystem issues. waiting for a fix from pyang. but it should not stop the review.
          Hide
          Joydeep Sen Sarma added a comment -

          additional changes:

          • pyang's fix for archive.q
          • move Proxy*FileSystem.java to shims/hadoop-20. The FileSystem interface has changed from 17-20 and the class i wrote only compiles for 0.20
          • the change to use the pfile:/// namespace for test warehouse is now used only for hadoop-20 (because of above). it's also excluded for minimr. the namespace is now controlled by ant.
          Show
          Joydeep Sen Sarma added a comment - additional changes: pyang's fix for archive.q move Proxy*FileSystem.java to shims/hadoop-20. The FileSystem interface has changed from 17-20 and the class i wrote only compiles for 0.20 the change to use the pfile:/// namespace for test warehouse is now used only for hadoop-20 (because of above). it's also excluded for minimr. the namespace is now controlled by ant.
          Hide
          Joydeep Sen Sarma added a comment -

          summarizing comments from internal review:

          • log why local mode was not chosen (not clear whether this should be printed all the way to the console)
          • turn it on by default in trunk
          • use mapred.child.java.opts for child jvm memory for local mode (as opposed to the current policy of passing down HADOOP_HEAPMAX). this will let the map-reduce engine run with more memory and allow us to differentiate between compiler and execution memory requirements
          • set auto-local reducer threshold to 1. local mode doesn't run more than one reducer.

          follow on jiras:
          1. don't scan all partitions for determining local mode (may apply to estimateReducers as well)
          2. use # of splits instead of # files for determining local mode.

          Show
          Joydeep Sen Sarma added a comment - summarizing comments from internal review: log why local mode was not chosen (not clear whether this should be printed all the way to the console) turn it on by default in trunk use mapred.child.java.opts for child jvm memory for local mode (as opposed to the current policy of passing down HADOOP_HEAPMAX). this will let the map-reduce engine run with more memory and allow us to differentiate between compiler and execution memory requirements set auto-local reducer threshold to 1. local mode doesn't run more than one reducer. follow on jiras: 1. don't scan all partitions for determining local mode (may apply to estimateReducers as well) 2. use # of splits instead of # files for determining local mode.
          Hide
          Joydeep Sen Sarma added a comment -
          • added messages explaining why local mode was not chosen
          • added negative test for above testing that we don't choose local mode with small max size limit
          • turned on by default in hive-default.xml.
          • turned off by default for tests because it might bypass minimr completely
          • set reducer threshold to 1 for choosing local mode

          regarding child jvm memory - there's already a separate option to control this (hive.mapred.local.mem). So no work is required.

          patch passes all tests in 0.20. testing for 0.17

          Show
          Joydeep Sen Sarma added a comment - added messages explaining why local mode was not chosen added negative test for above testing that we don't choose local mode with small max size limit turned on by default in hive-default.xml. turned off by default for tests because it might bypass minimr completely set reducer threshold to 1 for choosing local mode regarding child jvm memory - there's already a separate option to control this (hive.mapred.local.mem). So no work is required. patch passes all tests in 0.20. testing for 0.17
          Hide
          Joydeep Sen Sarma added a comment -

          final patch i hope!

          had to go through some hoops to make the test pass on all versions. it turns out not having the pfile implementation on different implementations makes the test outputs differ (ignoring pfile: in diffs is not enough because path order in different lists change)

          so i have ported the ProxyFileSystem to all the shims (only 17 required significant changes).

          tests of 17 and 20 both pass now (running 18 and 19).

          Show
          Joydeep Sen Sarma added a comment - final patch i hope! had to go through some hoops to make the test pass on all versions. it turns out not having the pfile implementation on different implementations makes the test outputs differ (ignoring pfile: in diffs is not enough because path order in different lists change) so i have ported the ProxyFileSystem to all the shims (only 17 required significant changes). tests of 17 and 20 both pass now (running 18 and 19).
          Hide
          Joydeep Sen Sarma added a comment -

          sigh - some more changes required in the shims to get all versions to pass. should have a final patch by morrow.

          Show
          Joydeep Sen Sarma added a comment - sigh - some more changes required in the shims to get all versions to pass. should have a final patch by morrow.
          Hide
          Joydeep Sen Sarma added a comment -

          final round of fixes. didn' realize that shim classes have to be uniquely named per hadoop version. added an exclusion to Proxy* - so that only one version of ProxyFileSystem is compiled - depending on target hadoop version. this is ok since it's only for tests.

          tests pass 17, 18, 20. Couple of tests in 19 are broken because of bad existing source - filed HIVE-1488 for that.

          Show
          Joydeep Sen Sarma added a comment - final round of fixes. didn' realize that shim classes have to be uniquely named per hadoop version. added an exclusion to Proxy* - so that only one version of ProxyFileSystem is compiled - depending on target hadoop version. this is ok since it's only for tests. tests pass 17, 18, 20. Couple of tests in 19 are broken because of bad existing source - filed HIVE-1488 for that.
          Hide
          Ning Zhang added a comment -

          Some questions:

          1) the local file system handled in shims are in a way that they are with the same file name (class name) and are compiled conditionally depending on the hadoop version during compile time. This may cause problem when deploying the same hive jar file to be used in different clusters with different version. The current shim was implemented by naming the classes differently and use ShimsLoader to get the correct class during execution time. This allows hive jar files to be deployed to different hadoop clusters.

          2) data/conf/hive-site.xml fs.pfile.impl is not needed if ShimsLoader is used as described above.

          3) the hive.exec.mode.local.auto default values are different in HiveConf.java and conf/hive-default.xml. It's better to be the same to avoid confusion.

          4) ctas.q.out: do you know why the GlobalTableID was changed?

          5) MapRedTask.java:149 The plan file name is not randomized as before. It may cause problem when the parallel execution mode is true and multiple MapRedTasks are running at the same time (e.g., parallel muti-table inserts).

          6) If there are 2 MapRed tasks and MR2 depends on MR1 and MR1 is decided to be running local, it seems MR2 have to be local since the intermediate files are stored in local file system? What about in parallel execution when MR1 and MR2 running in parallel and only one of them is local? It seems the info of whether a task is "local" is stored in Context (and HiveConf) which is shared among parallel MR tasks?

          7) ExecDriver.localizeMRTmpFileImpl changes the FileSinkDesc.dirName after the MR tasks have generated, it breaks the dynamic partition code which runs when the FileSinkOperator is generated. In particular, the DynamicPartitionCtx also stores the dirName, it has to be changed as well in localizeMRTmpFileImpl.

          8) MoveTask previously move intermediate directory in HDFS to the final directory also in HDFS. In the local mode, we should change the MoveTask execution as well?

          9) Driver.java:100 the two functions are made static. Should they be moved to Utilities?

          Show
          Ning Zhang added a comment - Some questions: 1) the local file system handled in shims are in a way that they are with the same file name (class name) and are compiled conditionally depending on the hadoop version during compile time. This may cause problem when deploying the same hive jar file to be used in different clusters with different version. The current shim was implemented by naming the classes differently and use ShimsLoader to get the correct class during execution time. This allows hive jar files to be deployed to different hadoop clusters. 2) data/conf/hive-site.xml fs.pfile.impl is not needed if ShimsLoader is used as described above. 3) the hive.exec.mode.local.auto default values are different in HiveConf.java and conf/hive-default.xml. It's better to be the same to avoid confusion. 4) ctas.q.out: do you know why the GlobalTableID was changed? 5) MapRedTask.java:149 The plan file name is not randomized as before. It may cause problem when the parallel execution mode is true and multiple MapRedTasks are running at the same time (e.g., parallel muti-table inserts). 6) If there are 2 MapRed tasks and MR2 depends on MR1 and MR1 is decided to be running local, it seems MR2 have to be local since the intermediate files are stored in local file system? What about in parallel execution when MR1 and MR2 running in parallel and only one of them is local? It seems the info of whether a task is "local" is stored in Context (and HiveConf) which is shared among parallel MR tasks? 7) ExecDriver.localizeMRTmpFileImpl changes the FileSinkDesc.dirName after the MR tasks have generated, it breaks the dynamic partition code which runs when the FileSinkOperator is generated. In particular, the DynamicPartitionCtx also stores the dirName, it has to be changed as well in localizeMRTmpFileImpl. 8) MoveTask previously move intermediate directory in HDFS to the final directory also in HDFS. In the local mode, we should change the MoveTask execution as well? 9) Driver.java:100 the two functions are made static. Should they be moved to Utilities?
          Hide
          Joydeep Sen Sarma added a comment -

          #1 - we decide that i would try to take out ProxyFileSystem from the hive jars in the distribution. unfortunately, i am unable to do so - all the simple ways seem to break the tests. i don't see much of a downside with the current arrangement - ProxyFileSystem is test-only code - there's no reason why anyone should invoke this. so shouldn't cause any problems (even though it ships with the hive jars). the pfile:// -> ProxyFileSystem mapping exists only in test mode.

          btw - i can't use ShimLoader - because Hadoop doesn't specify a factory class for creating file system object. it expects a file system class directly. that makes it impossible to write a portable filesystem class using the shimloader paradigm. i am beginning to appreciate factory classes more.

          #2 not an issue - can't use ShimLoader as per above.

          #3 fixed

          #4, #5, #6, #7, #8 - not an issue as we discussed. HIVE-1484 has already been filed as a followup work to use local dir for intermediate data when possible

          #9 - fixed. moved one public func to Utility.java and eliminated the other.

          Show
          Joydeep Sen Sarma added a comment - #1 - we decide that i would try to take out ProxyFileSystem from the hive jars in the distribution. unfortunately, i am unable to do so - all the simple ways seem to break the tests. i don't see much of a downside with the current arrangement - ProxyFileSystem is test-only code - there's no reason why anyone should invoke this. so shouldn't cause any problems (even though it ships with the hive jars). the pfile:// -> ProxyFileSystem mapping exists only in test mode. btw - i can't use ShimLoader - because Hadoop doesn't specify a factory class for creating file system object. it expects a file system class directly. that makes it impossible to write a portable filesystem class using the shimloader paradigm. i am beginning to appreciate factory classes more. #2 not an issue - can't use ShimLoader as per above. #3 fixed #4, #5, #6, #7, #8 - not an issue as we discussed. HIVE-1484 has already been filed as a followup work to use local dir for intermediate data when possible #9 - fixed. moved one public func to Utility.java and eliminated the other.
          Hide
          Ning Zhang added a comment -

          Looks good in general. One minor thing though: I tried it on real clusters and it works great except that I need to manually set mapred.local.dir even though hive.exec.mode.local.auto is already set to true. Should we treat mapred.local.dir the same as HADOOPJT so that it can be set automatically when local mode is on and reset it back in Driver and Context?

          Show
          Ning Zhang added a comment - Looks good in general. One minor thing though: I tried it on real clusters and it works great except that I need to manually set mapred.local.dir even though hive.exec.mode.local.auto is already set to true. Should we treat mapred.local.dir the same as HADOOPJT so that it can be set automatically when local mode is on and reset it back in Driver and Context?
          Hide
          Joydeep Sen Sarma added a comment -

          yeah - so the solution is that the mapred.local.dir needs to be set correctly in hive/hadoop client side xml. for our internal install - i will send a diff changing the client side to point to /tmp (instead of having server side config).

          there's nothing to do on the hive open source version. mapred.local.dir is a client only variable and needs to be set specific to the client side by the admin. basically our internal client side config has a bug

          Show
          Joydeep Sen Sarma added a comment - yeah - so the solution is that the mapred.local.dir needs to be set correctly in hive/hadoop client side xml. for our internal install - i will send a diff changing the client side to point to /tmp (instead of having server side config). there's nothing to do on the hive open source version. mapred.local.dir is a client only variable and needs to be set specific to the client side by the admin. basically our internal client side config has a bug
          Hide
          Joydeep Sen Sarma added a comment -

          Ning - anything else u need from me? i was hoping to get it in before hive-417. otherwise i am sure would have to regenerate/reconcile a ton of stuff

          Show
          Joydeep Sen Sarma added a comment - Ning - anything else u need from me? i was hoping to get it in before hive-417. otherwise i am sure would have to regenerate/reconcile a ton of stuff
          Hide
          Ning Zhang added a comment -

          Committed. Thanks Joydeep!

          Show
          Ning Zhang added a comment - Committed. Thanks Joydeep!

            People

            • Assignee:
              Joydeep Sen Sarma
              Reporter:
              Joydeep Sen Sarma
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development