Index: ql/src/test/results/clientnegative/fs_default_name2.q.out =================================================================== 1,6c1,11 < FAILED: Hive Internal Error: java.lang.RuntimeException(Error while making MR scratch directory - check filesystem config (java.net.URISyntaxException: Illegal character in scheme name at index 0: 'http://www.example.com)) < java.lang.RuntimeException: Error while making MR scratch directory - check filesystem config (java.net.URISyntaxException: Illegal character in scheme name at index 0: 'http://www.example.com) < at org.apache.hadoop.hive.ql.Context.getMRScratchDir(Context.java:208) < at org.apache.hadoop.hive.ql.Context.getMRTmpFileURI(Context.java:284) < at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:806) < at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:6041) --- > FAILED: Hive Internal Error: java.lang.IllegalArgumentException(null) > java.lang.IllegalArgumentException > at java.net.URI.create(URI.java:842) > at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:103) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184) > at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:167) > at org.apache.hadoop.hive.ql.Context.getLocalScratchDir(Context.java:157) > at org.apache.hadoop.hive.ql.Context.getMRScratchDir(Context.java:176) > at org.apache.hadoop.hive.ql.Context.getMRTmpFileURI(Context.java:238) > at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:828) > at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:6128) 8,9c13,14 < at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:304) < at org.apache.hadoop.hive.ql.Driver.run(Driver.java:377) --- > at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:301) > at org.apache.hadoop.hive.ql.Driver.run(Driver.java:376) 12,13c17,18 < at org.apache.hadoop.hive.ql.QTestUtil.executeClient(QTestUtil.java:504) < at org.apache.hadoop.hive.cli.TestNegativeCliDriver.testNegativeCliDriver_fs_default_name2(TestNegativeCliDriver.java:54) --- > at org.apache.hadoop.hive.ql.QTestUtil.executeClient(QTestUtil.java:559) > at org.apache.hadoop.hive.cli.TestNegativeCliDriver.testNegativeCliDriver_fs_default_name2(TestNegativeCliDriver.java:2166) 26,35c31,33 < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.run(JUnitTestRunner.java:420) < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.launch(JUnitTestRunner.java:911) < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.main(JUnitTestRunner.java:768) < Caused by: java.lang.IllegalArgumentException < at java.net.URI.create(URI.java:842) < at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:103) < at org.apache.hadoop.hive.common.FileUtils.makeQualified(FileUtils.java:58) < at org.apache.hadoop.hive.ql.Context.makeMRScratchDir(Context.java:128) < at org.apache.hadoop.hive.ql.Context.getMRScratchDir(Context.java:202) < ... 25 more --- > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.run(JUnitTestRunner.java:422) > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.launch(JUnitTestRunner.java:931) > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.main(JUnitTestRunner.java:785) 43c41 < ... 29 more --- > ... 30 more Index: ql/src/test/results/clientnegative/fs_default_name1.q.out =================================================================== 1,5c1,9 < FAILED: Hive Internal Error: java.lang.RuntimeException(Error while making local scratch directory - check filesystem config (java.net.URISyntaxException: Illegal character in scheme name at index 0: 'http://www.example.com)) < java.lang.RuntimeException: Error while making local scratch directory - check filesystem config (java.net.URISyntaxException: Illegal character in scheme name at index 0: 'http://www.example.com) < at org.apache.hadoop.hive.ql.Context.getLocalScratchDir(Context.java:225) < at org.apache.hadoop.hive.ql.Context.getLocalTmpFileURI(Context.java:293) < at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:102) --- > FAILED: Hive Internal Error: java.lang.IllegalArgumentException(null) > java.lang.IllegalArgumentException > at java.net.URI.create(URI.java:842) > at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:103) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184) > at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:167) > at org.apache.hadoop.hive.ql.Context.getLocalScratchDir(Context.java:157) > at org.apache.hadoop.hive.ql.Context.getLocalTmpFileURI(Context.java:273) > at org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer.analyzeInternal(DDLSemanticAnalyzer.java:114) 7,8c11,12 < at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:304) < at org.apache.hadoop.hive.ql.Driver.run(Driver.java:377) --- > at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:301) > at org.apache.hadoop.hive.ql.Driver.run(Driver.java:376) 11,12c15,16 < at org.apache.hadoop.hive.ql.QTestUtil.executeClient(QTestUtil.java:504) < at org.apache.hadoop.hive.cli.TestNegativeCliDriver.testNegativeCliDriver_fs_default_name1(TestNegativeCliDriver.java:54) --- > at org.apache.hadoop.hive.ql.QTestUtil.executeClient(QTestUtil.java:559) > at org.apache.hadoop.hive.cli.TestNegativeCliDriver.testNegativeCliDriver_fs_default_name1(TestNegativeCliDriver.java:2131) 25,35c29,31 < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.run(JUnitTestRunner.java:420) < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.launch(JUnitTestRunner.java:911) < at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.main(JUnitTestRunner.java:768) < Caused by: java.lang.IllegalArgumentException < at java.net.URI.create(URI.java:842) < at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:103) < at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:184) < at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:167) < at org.apache.hadoop.hive.ql.Context.makeLocalScratchDir(Context.java:165) < at org.apache.hadoop.hive.ql.Context.getLocalScratchDir(Context.java:219) < ... 24 more --- > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.run(JUnitTestRunner.java:422) > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.launch(JUnitTestRunner.java:931) > at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.main(JUnitTestRunner.java:785) 43c39 < ... 29 more --- > ... 28 more Index: ql/src/test/results/clientpositive/sample8.q.out =================================================================== 320c320 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin5.q.out =================================================================== 433c433 < base file name: 10002 --- > base file name: -mr-10002 954c954 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join33.q.out =================================================================== 198c198,214 < file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk/build/ql/test/data/warehouse/srcpart/ds=2008-04-08/hr=11 --- > file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk_local_mode/build/ql/scratchdir/hive_2010-07-07_17-51-08_504_8696032566548615513/-mr-10002 > Partition > base file name: -mr-10002 > input format: org.apache.hadoop.mapred.SequenceFileInputFormat > output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > properties: > columns _col0,_col1,_col3 > columns.types string,string,string > escape.delim \ > > input format: org.apache.hadoop.mapred.SequenceFileInputFormat > output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > properties: > columns _col0,_col1,_col3 > columns.types string,string,string > escape.delim \ > file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk_local_mode/build/ql/test/data/warehouse/srcpart/ds=2008-04-08/hr=11 239,254d254 < file:/tmp/jssarma/hive_2010-06-10_16-21-45_664_3974168394039456921/10002 < Partition < base file name: 10002 < input format: org.apache.hadoop.mapred.SequenceFileInputFormat < output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat < properties: < columns _col0,_col1,_col3 < columns.types string,string,string < escape.delim \ < < input format: org.apache.hadoop.mapred.SequenceFileInputFormat < output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat < properties: < columns _col0,_col1,_col3 < columns.types string,string,string < escape.delim \ Index: ql/src/test/results/clientpositive/input_part2.q.out =================================================================== 291c291 < base file name: 10004 --- > base file name: -mr-10004 411c411 < base file name: 10005 --- > base file name: -mr-10005 Index: ql/src/test/results/clientpositive/sample4.q.out =================================================================== 174c174 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join34.q.out =================================================================== 356c356 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin1.q.out =================================================================== 369c369 < base file name: 10002 --- > base file name: -mr-10002 862c862 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/filter_join_breaktask.q.out =================================================================== 274c274 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/sample5.q.out =================================================================== 172c172 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join26.q.out =================================================================== 340c340 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin_negative.q.out =================================================================== 319c319 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join35.q.out =================================================================== 370c370 < base file name: 10002 --- > base file name: -mr-10002 386c386 < base file name: 10004 --- > base file name: -mr-10004 458c458 < base file name: 10003 --- > base file name: -mr-10003 Index: ql/src/test/results/clientpositive/bucketmapjoin2.q.out =================================================================== 360c360 < base file name: 10002 --- > base file name: -mr-10002 842c842 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/udf_explode.q.out =================================================================== 224c224 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join_map_ppr.q.out =================================================================== 359c359 < base file name: 10002 --- > base file name: -mr-10002 965c965 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/rand_partitionpruner2.q.out =================================================================== 224c224 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/sample6.q.out =================================================================== 172c172 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin3.q.out =================================================================== 374c374 < base file name: 10002 --- > base file name: -mr-10002 868c868 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/sample1.q.out =================================================================== 201c201 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/ctas.q.out =================================================================== 106c106 < GlobalTableId: 0 --- > GlobalTableId: 1 226c226 < GlobalTableId: 0 --- > GlobalTableId: 1 346c346 < GlobalTableId: 0 --- > GlobalTableId: 1 500c500 < GlobalTableId: 0 --- > GlobalTableId: 1 670c670 < base file name: 10002 --- > base file name: -mr-10002 689,690c689,690 < GlobalTableId: 0 < directory: file:/data/users/njain/hive1/hive1/build/ql/scratchdir/hive_2010-02-17_15-35-13_160_4580041524192799730/10001 --- > GlobalTableId: 1 > directory: file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk_local_mode/build/ql/scratchdir/hive_2010-07-07_17-29-30_093_2676145814519753168/-ext-10001 Index: ql/src/test/results/clientpositive/binary_output_format.q.out =================================================================== 212c212 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin_negative2.q.out =================================================================== 305c305 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/sample7.q.out =================================================================== 179c179 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/bucketmapjoin4.q.out =================================================================== 350c350 < base file name: 10002 --- > base file name: -mr-10002 814c814 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/union22.q.out =================================================================== 374c374,390 < file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk/build/ql/test/data/warehouse/dst_union22_delta/ds=1 --- > file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk_local_mode/build/ql/scratchdir/hive_2010-07-08_00-56-18_476_5696643614844358621/-mr-10002 > Partition > base file name: -mr-10002 > input format: org.apache.hadoop.mapred.SequenceFileInputFormat > output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > properties: > columns _col0,_col1,_col8,_col9 > columns.types string,string,string,string > escape.delim \ > > input format: org.apache.hadoop.mapred.SequenceFileInputFormat > output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat > properties: > columns _col0,_col1,_col8,_col9 > columns.types string,string,string,string > escape.delim \ > file:/mnt/vol/devrs004.snc1/jssarma/projects/hive_trunk_local_mode/build/ql/test/data/warehouse/dst_union22_delta/ds=1 414,429d429 < file:/tmp/jssarma/hive_2010-06-10_16-21-01_038_1201658161362559492/10002 < Partition < base file name: 10002 < input format: org.apache.hadoop.mapred.SequenceFileInputFormat < output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat < properties: < columns _col0,_col1,_col8,_col9 < columns.types string,string,string,string < escape.delim \ < < input format: org.apache.hadoop.mapred.SequenceFileInputFormat < output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat < properties: < columns _col0,_col1,_col8,_col9 < columns.types string,string,string,string < escape.delim \ Index: ql/src/test/results/clientpositive/sample2.q.out =================================================================== 174c174 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/join32.q.out =================================================================== 291c291 < base file name: 10003 --- > base file name: -mr-10003 363c363 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/clientpositive/input_part1.q.out =================================================================== 192c192 < base file name: 10002 --- > base file name: -mr-10002 Index: ql/src/test/results/compiler/plan/join2.q.xml =================================================================== 2c2 < --- > 90c90 < 1269980341 --- > 1278576787 182c182 < 1269980341 --- > 1278576785 792c792 < 10002 --- > -mr-10002 899c899 < 1269980341 --- > 1278576785 1408c1408 < 1269980341 --- > 1278576785 1495c1495 < 1269980341 --- > 1278576785 2103c2103 < 1269980341 --- > 1278576785 Index: ql/src/test/results/compiler/plan/input2.q.xml =================================================================== 2c2 < --- > 304c304 < 10006 --- > -mr-10006 359c359 < 1269980309 --- > 1278576704 458c458 < 1269980309 --- > 1278576704 964c964 < 10007 --- > -mr-10007 1019c1019 < 1269980309 --- > 1278576704 1118c1118 < 1269980309 --- > 1278576704 1624c1624 < 10008 --- > -mr-10008 1679c1679 < 1269980309 --- > 1278576704 1782c1782 < 1269980309 --- > 1278576704 2078c2078 < 1269980308 --- > 1278576702 3139c3139 < 1269980308 --- > 1278576702 Index: ql/src/test/results/compiler/plan/input3.q.xml =================================================================== 2c2 < --- > 304c304 < 10007 --- > -mr-10007 359c359 < 1273610222 --- > 1278576717 458c458 < 1273610222 --- > 1278576717 964c964 < 10008 --- > -mr-10008 1019c1019 < 1273610222 --- > 1278576717 1118c1118 < 1273610222 --- > 1278576717 1624c1624 < 10009 --- > -mr-10009 1679c1679 < 1273610222 --- > 1278576717 1782c1782 < 1273610222 --- > 1278576717 2275c2275 < 10010 --- > -mr-10010 2629c2629 < 1273610220 --- > 1278576715 4003c4003 < 1273610220 --- > 1278576715 Index: ql/src/test/results/compiler/plan/input6.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980321 --- > 1278576737 458c458 < 1269980321 --- > 1278576737 741c741 < 1269980321 --- > 1278576737 1248c1248 < 1269980321 --- > 1278576737 Index: ql/src/test/results/compiler/plan/input7.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980323 --- > 1278576742 458c458 < 1269980323 --- > 1278576742 745c745 < 1269980323 --- > 1278576742 1088c1088 < 1269980323 --- > 1278576742 Index: ql/src/test/results/compiler/plan/input_testsequencefile.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980333 --- > 1278576768 458c458 < 1269980333 --- > 1278576768 741c741 < 1269980332 --- > 1278576767 1093c1093 < 1269980332 --- > 1278576767 Index: ql/src/test/results/compiler/plan/union.q.xml =================================================================== 2c2 < --- > 304c304 < 10001 --- > -mr-10001 668c668 < 1268858283 --- > 1278576878 755c755 < 1268858283 --- > 1278576878 1878c1878 < 1268858283 --- > 1278576878 Index: ql/src/test/results/compiler/plan/input9.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980328 --- > 1278576753 458c458 < 1269980328 --- > 1278576753 745c745 < 1269980328 --- > 1278576753 1262c1262 < 1269980328 --- > 1278576753 Index: ql/src/test/results/compiler/plan/subq.q.xml =================================================================== 2c2 < --- > 304c304 < 10001 --- > -mr-10001 668c668 < 1268858270 --- > 1278576858 1349c1349 < 1268858270 --- > 1278576858 Index: ql/src/test/results/compiler/plan/case_sensitivity.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980282 --- > 1278576636 458c458 < 1269980282 --- > 1278576636 749c749 < 1269980282 --- > 1278576636 1495c1495 < 1269980282 --- > 1278576636 Index: ql/src/test/results/compiler/plan/sample2.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751318 --- > 1278576831 458c458 < 1270751318 --- > 1278576831 749c749 < 1270751317 --- > 1278576828 1473c1473 < 1270751317 --- > 1278576828 Index: ql/src/test/results/compiler/plan/sample3.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751321 --- > 1278576839 458c458 < 1270751321 --- > 1278576839 749c749 < 1270751320 --- > 1278576836 1496c1496 < 1270751320 --- > 1278576836 Index: ql/src/test/results/compiler/plan/sample4.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751324 --- > 1278576845 458c458 < 1270751324 --- > 1278576845 749c749 < 1270751323 --- > 1278576844 1473c1473 < 1270751323 --- > 1278576844 Index: ql/src/test/results/compiler/plan/sample5.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751327 --- > 1278576849 458c458 < 1270751327 --- > 1278576849 749c749 < 1270751327 --- > 1278576847 1470c1470 < 1270751327 --- > 1278576847 Index: ql/src/test/results/compiler/plan/sample6.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751330 --- > 1278576853 458c458 < 1270751330 --- > 1278576853 749c749 < 1270751329 --- > 1278576851 1473c1473 < 1270751329 --- > 1278576851 Index: ql/src/test/results/compiler/plan/sample7.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1270751334 --- > 1278576856 458c458 < 1270751334 --- > 1278576856 749c749 < 1270751332 --- > 1278576855 1643c1643 < 1270751332 --- > 1278576855 Index: ql/src/test/results/compiler/plan/input1.q.xml =================================================================== 2c2 < --- > 304c304 < 10002 --- > -mr-10002 359c359 < 1269980306 --- > 1278576696 458c458 < 1269980306 --- > 1278576696 741c741 < 1269980306 --- > 1278576696 1292c1292 < 1269980306 --- > 1278576696 Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (working copy) @@ -238,13 +238,15 @@ } else { Partition part = Hive.get().getPartition(tab, partSpec, Boolean.FALSE); + String state = "retained"; if (Boolean.TRUE.equals(r)) { - LOG.debug("retained partition: " + partSpec); true_parts.add(part); } else { - LOG.debug("unknown partition: " + partSpec); unkn_parts.add(part); + state = "unknown"; } + if (LOG.isDebugEnabled()) + LOG.debug(state + " partition: " + partSpec); } } else { // is there is no parition pruning, all of them are needed Index: ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (working copy) @@ -587,7 +587,8 @@ continue; } String path = p.toString(); - LOG.debug("Adding " + path + " of table" + alias_id); + if (LOG.isDebugEnabled()) + LOG.debug("Adding " + path + " of table" + alias_id); partDir.add(p); try { @@ -615,7 +616,8 @@ } plan.getPathToAliases().get(path).add(alias_id); plan.getPathToPartitionInfo().put(path, prtDesc); - LOG.debug("Information added for path " + path); + if (LOG.isDebugEnabled()) + LOG.debug("Information added for path " + path); } assert plan.getAliasToWork().get(alias_id) == null; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CreateFunctionDesc; @@ -127,4 +128,9 @@ public String getName() { return "FUNCTION"; } -} + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + throw new RuntimeException ("Unexpected call"); + } +} \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (working copy) @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.plan.FetchWork; @@ -147,4 +148,15 @@ public String getName() { return "FETCH"; } + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + String s = work.getTblDir(); + if ((s != null) && ctx.isMRTmpFileURI(s)) + work.setTblDir(ctx.localizeMRTmpFileURI(s)); + + ArrayList ls = work.getPartDir(); + if (ls != null) + ctx.localizePaths(ls); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (working copy) @@ -427,4 +427,10 @@ public String getName() { return "MAP"; } + + @Override + public int getType() { + return -1; + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CollectOperator.java (working copy) @@ -77,4 +77,8 @@ } } + @Override + public int getType() { + return -1; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -123,18 +124,6 @@ */ protected abstract int execute(DriverContext driverContext); - /** - * Update the progress of the task within taskHandle and also dump the - * progress information to the history file. - * - * @param taskHandle - * task handle returned by execute - * @throws IOException - */ - public void progress(TaskHandle taskHandle) throws IOException { - // do nothing by default - } - // dummy method - FetchTask overwrites this public boolean fetch(ArrayList res) throws IOException { assert false; @@ -273,10 +262,6 @@ return false; } - public void updateCounters(TaskHandle th) throws IOException { - // default, do nothing - } - public HashMap getCounters() { return taskCounters; } @@ -291,4 +276,34 @@ assert false; return -1; } + + /** + * If this task uses any map-reduce intermediate data (either for reading + * or for writing), localize them (using the supplied Context). Map-Reduce + * intermediate directories are allocated using Context.getMRTmpFileURI() + * and can be localized using localizeMRTmpFileURI(). + * + * This method is declared abstract to force any task code to explicitly + * deal with this aspect of execution. + * + * @param ctx context object with which to localize + */ + abstract protected void localizeMRTmpFilesImpl(Context ctx); + + /** + * Localize a task tree + * @param ctx context object with which to localize + */ + public final void localizeMRTmpFiles(Context ctx) { + localizeMRTmpFilesImpl(ctx); + + if (childTasks == null) + return; + + for (Task t: childTasks) { + t.localizeMRTmpFiles(ctx); + } + } + } + \ No newline at end of file Index: ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/LateralViewJoinOperator.java (working copy) @@ -28,7 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; - +import org.apache.hadoop.hive.ql.plan.api.OperatorType; /** * The lateral view join operator is used to implement the lateral view * functionality. This operator was implemented with the following operator DAG @@ -126,4 +126,8 @@ return "LVJ"; } + @Override + public int getType() { + return OperatorType.LATERALVIEWJOIN; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (working copy) @@ -709,7 +709,8 @@ } public static GenericUDAFResolver getGenericUDAFResolver(String functionName) { - LOG.debug("Looking up GenericUDAF: " + functionName); + if (LOG.isDebugEnabled()) + LOG.debug("Looking up GenericUDAF: " + functionName); FunctionInfo finfo = mFunctions.get(functionName.toLowerCase()); if (finfo == null) { return null; @@ -847,9 +848,10 @@ conversionCost += cost; } } - LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = " - + argumentsPassed + " accepted = " + argumentsAccepted + " method = " - + m); + if (LOG.isDebugEnabled()) + LOG.debug("Method " + (match ? "did" : "didn't") + " match: passed = " + + argumentsPassed + " accepted = " + argumentsAccepted + + " method = " + m); if (match) { // Always choose the function with least implicit conversions. if (conversionCost < leastConversionCost) { Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (working copy) @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; /** @@ -402,4 +403,12 @@ public String getName() { return "EXPLAIN"; } + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + // explain task has nothing to localize + // we don't expect to enter this code path at all + throw new RuntimeException ("Unexpected call"); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java (working copy) @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.plan.ConditionalResolver; @@ -199,4 +200,11 @@ } return ret; } + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + if (getListTasks() != null) + for(Task t: getListTasks()) + t.localizeMRTmpFiles(ctx); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (working copy) @@ -65,10 +65,8 @@ .add(new taskTuple(ExplainWork.class, ExplainTask.class)); taskvec.add(new taskTuple(ConditionalWork.class, ConditionalTask.class)); - // we are taking this out to allow us to instantiate either MapRedTask or - // ExecDriver dynamically at run time based on configuration - // taskvec.add(new taskTuple(mapredWork.class, - // ExecDriver.class)); + taskvec.add(new taskTuple(MapredWork.class, + MapRedTask.class)); } private static ThreadLocal tid = new ThreadLocal() { @@ -104,28 +102,6 @@ } } - if (workClass == MapredWork.class) { - - boolean viachild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD); - - try { - - // in local mode - or if otherwise so configured - always submit - // jobs via separate jvm - Task ret = null; - if (conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local") || viachild) { - ret = (Task) MapRedTask.class.newInstance(); - } else { - ret = (Task) ExecDriver.class.newInstance(); - } - ret.setId("Stage-" + Integer.toString(getAndIncrementId())); - return ret; - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - - } - throw new RuntimeException("No task for work class " + workClass.getName()); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (working copy) @@ -18,9 +18,9 @@ package org.apache.hadoop.hive.ql.exec; -import java.io.File; -import java.io.FileOutputStream; +import java.io.OutputStream; import java.io.Serializable; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -28,19 +28,28 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.mapred.JobConf; /** - * Alternate implementation (to ExecDriver) of spawning a mapreduce task that - * runs it from a separate jvm. The primary issue with this is the inability to - * control logging from a separate jvm in a consistent manner + * Extension of ExecDriver: + * - can optionally spawn a map-reduce task from a separate jvm + * - will make last minute adjustments to map-reduce job parameters, viz: + * * estimating number of reducers + * * estimating whether job should run locally **/ -public class MapRedTask extends Task implements Serializable { +public class MapRedTask extends ExecDriver implements Serializable { private static final long serialVersionUID = 1L; @@ -48,21 +57,72 @@ static final String HADOOP_OPTS_KEY = "HADOOP_OPTS"; static final String[] HIVE_SYS_PROP = {"build.dir", "build.dir.hive"}; + private transient ContentSummary inputSummary = null; + private transient boolean runningViaChild = false; + public MapRedTask() { super(); } + public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException { + throw new RuntimeException("Illegal Constructor call"); + } + @Override public int execute(DriverContext driverContext) { + Context ctx = driverContext.getCtx(); + boolean ctxCreated = false; + try { + if (ctx == null) { + ctx = new Context(conf); + ctxCreated = true; + } + + // estimate number of reducers + setNumberOfReducers(); + + // auto-determine local mode if allowed + if (!ctx.isLocalOnlyExecutionMode() && + conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) { + + if (inputSummary == null) + inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); + + // at this point the number of reducers is precisely defined in the plan + int numReducers = work.getNumReduceTasks(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Task: " + getId() + ", Summary: " + + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + + numReducers); + } + + if (MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers)) { + // set the JT to local for the duration of this job + ctx.setOriginalTracker(conf.getVar(HiveConf.ConfVars.HADOOPJT)); + conf.setVar(HiveConf.ConfVars.HADOOPJT, "local"); + LOG.info("Selecting local mode for task: " + getId()); + } + } + + runningViaChild = + "local".equals(conf.getVar(HiveConf.ConfVars.HADOOPJT)) || + conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD); + + if(!runningViaChild) { + // we are not running this mapred task via child jvm + // so directly invoke ExecDriver + return super.execute(driverContext); + } + // enable assertion String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN); String hiveJar = conf.getJar(); String libJarsOption; - String addedJars = ExecDriver.getResourceFiles(conf, - SessionState.ResourceType.JAR); + String addedJars = getResourceFiles(conf, SessionState.ResourceType.JAR); conf.setVar(ConfVars.HIVEADDEDJARS, addedJars); String auxJars = conf.getAuxJars(); // Put auxjars and addedjars together into libjars @@ -80,40 +140,13 @@ } } // Generate the hiveConfArgs after potentially adding the jars - String hiveConfArgs = ExecDriver.generateCmdLine(conf); - String hiveScratchDir; - if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) - hiveScratchDir = driverContext.getCtx().getQueryPath().toString(); - else - hiveScratchDir = conf.getVar(HiveConf.ConfVars.SCRATCHDIR); + String hiveConfArgs = generateCmdLine(conf); - File scratchDir = new File(hiveScratchDir); - - // Check if the scratch directory exists. If not, create it. - if (!scratchDir.exists()) { - LOG.info("Local scratch directory " + scratchDir.getPath() - + " not found. Attempting to create."); - if (!scratchDir.mkdirs()) { - // Unable to create this directory - it might have been created due - // to another process. - if (!scratchDir.exists()) { - throw new TaskExecutionException( - "Cannot create scratch directory " - + "\"" + scratchDir.getPath() + "\". " - + "To configure a different directory, " - + "set the configuration " - + "\"hive.exec.scratchdir\" " - + "in the session, or permanently by modifying the " - + "appropriate hive configuration file such as hive-site.xml."); - } - } - } - + // write out the plan to a local file + Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml"); + OutputStream out = FileSystem.getLocal(conf).create(planPath); MapredWork plan = getWork(); - - File planFile = File.createTempFile("plan", ".xml", scratchDir); - LOG.info("Generating plan file " + planFile.toString()); - FileOutputStream out = new FileOutputStream(planFile); + LOG.info("Generating plan file " + planPath.toString()); Utilities.serializeMapRedWork(plan, out); String isSilent = "true".equalsIgnoreCase(System @@ -127,10 +160,9 @@ } String cmdLine = hadoopExec + " jar " + jarCmd + " -plan " - + planFile.toString() + " " + isSilent + " " + hiveConfArgs; + + planPath.toString() + " " + isSilent + " " + hiveConfArgs; - String files = ExecDriver.getResourceFiles(conf, - SessionState.ResourceType.FILE); + String files = getResourceFiles(conf, SessionState.ResourceType.FILE); if (!files.isEmpty()) { cmdLine = cmdLine + " -files " + files; } @@ -196,27 +228,136 @@ e.printStackTrace(); LOG.error("Exception: " + e.getMessage()); return (1); + } finally { + try { + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + ctx.restoreOriginalTracker(); + + // creating the context can create a bunch of files. So make + // sure to clear it out + if(ctxCreated) + ctx.clear(); + + } catch (Exception e) { + LOG.error("Exception: " + e.getMessage()); + } } } @Override - public boolean isMapRedTask() { - return true; + public boolean mapStarted() { + boolean b = super.mapStarted(); + return runningViaChild ? isdone : b; } @Override - public boolean hasReduce() { - MapredWork w = getWork(); - return w.getReducer() != null; + public boolean reduceStarted() { + boolean b = super.reduceStarted(); + return runningViaChild ? isdone : b; } @Override - public int getType() { - return StageType.MAPREDLOCAL; + public boolean mapDone() { + boolean b = super.mapDone(); + return runningViaChild ? isdone : b; } @Override - public String getName() { - return "MAPRED"; + public boolean reduceDone() { + boolean b = super.reduceDone(); + return runningViaChild ? isdone : b; } + + /** + * Set the number of reducers for the mapred work. + */ + private void setNumberOfReducers() throws IOException { + // this is a temporary hack to fix things that are not fixed in the compiler + Integer numReducersFromWork = work.getNumReduceTasks(); + + if (work.getReducer() == null) { + console + .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator"); + work.setNumReduceTasks(Integer.valueOf(0)); + } else { + if (numReducersFromWork >= 0) { + console.printInfo("Number of reduce tasks determined at compile time: " + + work.getNumReduceTasks()); + } else if (job.getNumReduceTasks() > 0) { + int reducers = job.getNumReduceTasks(); + work.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " + + reducers); + } else { + int reducers = estimateNumberOfReducers(); + work.setNumReduceTasks(reducers); + console + .printInfo("Number of reduce tasks not specified. Estimated from input data size: " + + reducers); + + } + console + .printInfo("In order to change the average load for a reducer (in bytes):"); + console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname + + "="); + console.printInfo("In order to limit the maximum number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname + + "="); + console.printInfo("In order to set a constant number of reducers:"); + console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + + "="); + } + } + + /** + * Estimate the number of reducers needed for this job, based on job input, + * and configuration parameters. + * + * @return the number of reducers. + */ + private int estimateNumberOfReducers() throws IOException { + long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); + int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS); + + if(inputSummary == null) + // compute the summary and stash it away + inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null); + + long totalInputFileSize = inputSummary.getLength(); + + LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + + maxReducers + " totalInputFileSize=" + totalInputFileSize); + + int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); + reducers = Math.max(1, reducers); + reducers = Math.min(maxReducers, reducers); + return reducers; + } + + + /** + * Find out if a job can be run in local mode based on it's characteristics + * + * @param conf Hive Configuration + * @param inputSummary summary about the input files for this job + * @param numReducers total number of reducers for this job + */ + public static boolean isEligibleForLocalMode(HiveConf conf, + ContentSummary inputSummary, + int numReducers) { + + long maxBytes = conf.getLongVar(HiveConf.ConfVars.LOCALMODEMAXBYTES); + long maxTasks = conf.getIntVar(HiveConf.ConfVars.LOCALMODEMAXTASKS); + + // ideally we would like to do this check based on the number of splits + // in the absence of an easy way to get the number of splits - do this + // based on the total number of files (pessimistically assumming that + // splits are equal to number of files in worst case + + return (inputSummary.getLength() <= maxBytes && + inputSummary.getFileCount() <= maxTasks && + numReducers <= maxTasks); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (working copy) @@ -282,9 +282,13 @@ splitNum = 0; serde = tmp.getDeserializerClass().newInstance(); serde.initialize(job, tmp.getProperties()); - LOG.debug("Creating fetchTask with deserializer typeinfo: " - + serde.getObjectInspector().getTypeName()); - LOG.debug("deserializer properties: " + tmp.getProperties()); + + if(LOG.isDebugEnabled()) { + LOG.debug("Creating fetchTask with deserializer typeinfo: " + + serde.getObjectInspector().getTypeName()); + LOG.debug("deserializer properties: " + tmp.getProperties()); + } + if (currPart != null) { setPrtnDesc(); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (working copy) @@ -1128,15 +1128,12 @@ } /** - * Should be overridden to return the type of the specific operator among the + * Return the type of the specific operator among the * types in OperatorType. * - * @return OperatorType.* or -1 if not overridden + * @return OperatorType.* */ - public int getType() { - assert false; - return -1; - } + abstract public int getType(); public void setGroupKeyObject(Object keyObject) { this.groupKeyObject = keyObject; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -45,13 +45,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution; @@ -60,7 +60,10 @@ import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -96,8 +99,6 @@ protected transient int reduceProgress = 0; protected transient boolean success = false; // if job execution is successful - public static Random randGen = new Random(); - /** * Constructor when invoked from QL. */ @@ -105,7 +106,7 @@ super(); } - public static String getResourceFiles(Configuration conf, + protected static String getResourceFiles(Configuration conf, SessionState.ResourceType t) { // fill in local files to be added to the task environment SessionState ss = SessionState.get(); @@ -178,7 +179,7 @@ * used to kill all running jobs in the event of an unexpected shutdown - * i.e., the JVM shuts down while there are still jobs running. */ - public static Map runningJobKillURIs = + private static Map runningJobKillURIs = Collections.synchronizedMap(new HashMap()); /** @@ -222,7 +223,7 @@ /** * from StreamJob.java. */ - public void jobInfo(RunningJob rj) { + private void jobInfo(RunningJob rj) { if (job.get("mapred.job.tracker", "local").equals("local")) { console.printInfo("Job running in-process (local Hadoop)"); } else { @@ -245,7 +246,7 @@ * return this handle from execute and Driver can split execute into start, * monitorProgess and postProcess. */ - public static class ExecDriverTaskHandle extends TaskHandle { + private static class ExecDriverTaskHandle extends TaskHandle { JobClient jc; RunningJob rj; @@ -284,8 +285,7 @@ * @return true if fatal errors happened during job execution, false * otherwise. */ - protected boolean checkFatalErrors(TaskHandle t, StringBuilder errMsg) { - ExecDriverTaskHandle th = (ExecDriverTaskHandle) t; + private boolean checkFatalErrors(ExecDriverTaskHandle th, StringBuilder errMsg) { RunningJob rj = th.getRunningJob(); try { Counters ctrs = th.getCounters(); @@ -311,9 +311,7 @@ } } - @Override - public void progress(TaskHandle taskHandle) throws IOException { - ExecDriverTaskHandle th = (ExecDriverTaskHandle) taskHandle; + private void progress(ExecDriverTaskHandle th) throws IOException { JobClient jc = th.getJobClient(); RunningJob rj = th.getRunningJob(); String lastReport = ""; @@ -404,101 +402,9 @@ } /** - * Estimate the number of reducers needed for this job, based on job input, - * and configuration parameters. - * - * @return the number of reducers. - */ - public int estimateNumberOfReducers(HiveConf hive, JobConf job, - MapredWork work) throws IOException { - if (hive == null) { - hive = new HiveConf(); - } - long bytesPerReducer = hive.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER); - int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS); - long totalInputFileSize = getTotalInputFileSize(job, work); - - LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" - + maxReducers + " totalInputFileSize=" + totalInputFileSize); - - int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer); - reducers = Math.max(1, reducers); - reducers = Math.min(maxReducers, reducers); - return reducers; - } - - /** - * Set the number of reducers for the mapred work. - */ - protected void setNumberOfReducers() throws IOException { - // this is a temporary hack to fix things that are not fixed in the compiler - Integer numReducersFromWork = work.getNumReduceTasks(); - - if (work.getReducer() == null) { - console - .printInfo("Number of reduce tasks is set to 0 since there's no reduce operator"); - work.setNumReduceTasks(Integer.valueOf(0)); - } else { - if (numReducersFromWork >= 0) { - console.printInfo("Number of reduce tasks determined at compile time: " - + work.getNumReduceTasks()); - } else if (job.getNumReduceTasks() > 0) { - int reducers = job.getNumReduceTasks(); - work.setNumReduceTasks(reducers); - console - .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: " - + reducers); - } else { - int reducers = estimateNumberOfReducers(conf, job, work); - work.setNumReduceTasks(reducers); - console - .printInfo("Number of reduce tasks not specified. Estimated from input data size: " - + reducers); - - } - console - .printInfo("In order to change the average load for a reducer (in bytes):"); - console.printInfo(" set " + HiveConf.ConfVars.BYTESPERREDUCER.varname - + "="); - console.printInfo("In order to limit the maximum number of reducers:"); - console.printInfo(" set " + HiveConf.ConfVars.MAXREDUCERS.varname - + "="); - console.printInfo("In order to set a constant number of reducers:"); - console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS - + "="); - } - } - - /** - * Calculate the total size of input files. - * - * @param job - * the hadoop job conf. - * @return the total size in bytes. - * @throws IOException - */ - public long getTotalInputFileSize(JobConf job, MapredWork work) throws IOException { - long r = 0; - // For each input path, calculate the total size. - for (String path : work.getPathToAliases().keySet()) { - try { - Path p = new Path(path); - FileSystem fs = p.getFileSystem(job); - ContentSummary cs = fs.getContentSummary(p); - r += cs.getLength(); - } catch (IOException e) { - LOG.info("Cannot get size of " + path + ". Safely ignored."); - } - } - return r; - } - - /** * Update counters relevant to this task. */ - @Override - public void updateCounters(TaskHandle t) throws IOException { - ExecDriverTaskHandle th = (ExecDriverTaskHandle) t; + private void updateCounters(ExecDriverTaskHandle th) throws IOException { RunningJob rj = th.getRunningJob(); mapProgress = Math.round(rj.mapProgress() * 100); reduceProgress = Math.round(rj.reduceProgress() * 100); @@ -543,49 +449,31 @@ success = true; - try { - setNumberOfReducers(); - } catch (IOException e) { - String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: " - + e.getMessage(); - console.printError(statusMesg, "\n" - + org.apache.hadoop.util.StringUtils.stringifyException(e)); - return 1; - } - String invalidReason = work.isInvalid(); if (invalidReason != null) { throw new RuntimeException("Plan invalid, Reason: " + invalidReason); } - String hiveScratchDir; - if (driverContext.getCtx() != null && driverContext.getCtx().getQueryPath() != null) { - hiveScratchDir = driverContext.getCtx().getQueryPath().toString(); - } else { - hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR); - } + Context ctx = driverContext.getCtx(); + boolean ctxCreated = false; + String emptyScratchDirStr; + Path emptyScratchDir; - String emptyScratchDirStr = null; - Path emptyScratchDir = null; + try { + if (ctx == null) { + ctx = new Context(job); + ctxCreated = true; + } - int numTries = 3; - while (numTries > 0) { - emptyScratchDirStr = hiveScratchDir + File.separator - + Utilities.randGen.nextInt(); + emptyScratchDirStr = ctx.getMRTmpFileURI(); emptyScratchDir = new Path(emptyScratchDirStr); - - try { - FileSystem fs = emptyScratchDir.getFileSystem(job); - fs.mkdirs(emptyScratchDir); - break; - } catch (Exception e) { - if (numTries > 0) { - numTries--; - } else { - throw new RuntimeException("Failed to make dir " - + emptyScratchDir.toString() + " : " + e.getMessage()); - } - } + FileSystem fs = emptyScratchDir.getFileSystem(job); + fs.mkdirs(emptyScratchDir); + } catch (IOException e) { + e.printStackTrace(); + console.printError("Error launching map-reduce job", "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return 5; } ShimLoader.getHadoopShims().setNullOutputFormat(job); @@ -674,13 +562,13 @@ if (noName) { // This is for a special case to ensure unit tests pass HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, "JOB" - + randGen.nextInt()); + + Utilities.randGen.nextInt()); } try { addInputPaths(job, work, emptyScratchDirStr); - Utilities.setMapRedWork(job, work, hiveScratchDir); + Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI()); // remove the pwd from conf file so that job tracker doesn't show this // logs @@ -699,19 +587,17 @@ HiveConf.setVar(job, HiveConf.ConfVars.METASTOREPWD, pwd); } - // add to list of running jobs so in case of abnormal shutdown can kill - // it. + // add to list of running jobs to kill in case of abnormal shutdown runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + "&action=kill"); - TaskHandle th = new ExecDriverTaskHandle(jc, rj); + ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj); jobInfo(rj); progress(th); // success status will be setup inside progress if (rj == null) { // in the corner case where the running job has disappeared from JT - // memory - // remember that we did actually submit the job. + // memory remember that we did actually submit the job. rj = orig_rj; success = false; } @@ -743,7 +629,9 @@ } finally { Utilities.clearMapRedWork(job); try { - emptyScratchDir.getFileSystem(job).delete(emptyScratchDir, true); + if(ctxCreated) + ctx.clear(); + if (returnVal != 0 && rj != null) { rj.killJob(); } @@ -796,7 +684,7 @@ * @param jobId * @return */ - public static String getJobStartMsg(String jobId) { + private static String getJobStartMsg(String jobId) { return "Starting Job = " + jobId; } @@ -1081,16 +969,10 @@ // log the list of job conf parameters for reference LOG.info(sb.toString()); - URI pathURI = (new Path(planFileName)).toUri(); - InputStream pathData; - if (StringUtils.isEmpty(pathURI.getScheme())) { - // default to local file system - pathData = new FileInputStream(planFileName); - } else { - // otherwise may be in hadoop .. - FileSystem fs = FileSystem.get(conf); - pathData = fs.open(new Path(planFileName)); - } + // the plan file should always be in local directory + Path p = new Path(planFileName); + FileSystem fs = FileSystem.getLocal(conf); + InputStream pathData = fs.open(p); // this is workaround for hadoop-17 - libjars are not added to classpath of the // child process. so we add it here explicitly @@ -1177,13 +1059,13 @@ sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt(), "UTF-8")); } - + return sb.toString(); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } - + @Override public boolean isMapRedTask() { return true; @@ -1195,19 +1077,6 @@ return w.getReducer() != null; } - private boolean isEmptyPath(JobConf job, String path) throws Exception { - Path dirPath = new Path(path); - FileSystem inpFs = dirPath.getFileSystem(job); - - if (inpFs.exists(dirPath)) { - FileStatus[] fStats = inpFs.listStatus(dirPath); - if (fStats.length > 0) { - return false; - } - } - return true; - } - /** * Handle a empty/null path for a given alias. */ @@ -1309,7 +1178,7 @@ LOG.info("Adding input file " + path); - if (!isEmptyPath(job, path)) { + if (!Utilities.isEmptyPath(job, path)) { FileInputFormat.addInputPaths(job, path); } else { emptyPaths.add(path); @@ -1345,6 +1214,53 @@ @Override public String getName() { - return "EXEC"; + return "MAPRED"; } + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + + // localize any map-reduce input paths + ctx.localizeKeys((Map)((Object)work.getPathToAliases())); + ctx.localizeKeys((Map)((Object)work.getPathToPartitionInfo())); + + // localize any input paths for maplocal work + MapredLocalWork l = work.getMapLocalWork(); + if (l != null) { + Map m = l.getAliasToFetchWork(); + if (m != null) { + for (FetchWork fw: m.values()) { + String s = fw.getTblDir(); + if ((s != null) && ctx.isMRTmpFileURI(s)) + fw.setTblDir(ctx.localizeMRTmpFileURI(s)); + } + } + } + + // fix up outputs + Map> pa = work.getPathToAliases(); + if (pa != null) { + for (List ls: pa.values()) + for (String a: ls) { + ArrayList> opList = new + ArrayList> (); + opList.add(work.getAliasToWork().get(a)); + + while (!opList.isEmpty()) { + Operator op = opList.remove(0); + + if (op instanceof FileSinkOperator) { + FileSinkDesc fdesc = ((FileSinkOperator)op).getConf(); + String s = fdesc.getDirName(); + if ((s != null) && ctx.isMRTmpFileURI(s)) + fdesc.setDirName(ctx.localizeMRTmpFileURI(s)); + ((FileSinkOperator)op).setConf(fdesc); + } + + if (op.getChildOperators() != null) + opList.addAll(op.getChildOperators()); + } + } + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -65,10 +65,13 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -269,17 +272,25 @@ } + /** + * Make map-reduce plan available to map-reduce job + * In local mode - we simply cache the plan in the process (since local mode runs + * in same JVM. Otherwise we write it to the passed in temp folder and add to + * DistributedCache. We assume that the temp directory is unique for this job + * + * @param job configuration object for this job + * @param w map-reduce plan to be be made available + * @param hiveScratchDir temp directory available to map-reduce job/tasks + */ + + public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) { try { - // Serialize the plan to the default hdfs instance - // Except for hadoop local mode execution where we should be - // able to get the plan directly from the cache - if(!HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJT).equals("local")) { - // use the default file system of the job - FileSystem fs = FileSystem.get(job); - Path planPath = new Path(hiveScratchDir, "plan." + randGen.nextInt()); + Path p = new Path(hiveScratchDir); + Path planPath = new Path(p, "plan"); + FileSystem fs = p.getFileSystem(job); FSDataOutputStream out = fs.create(planPath); serializeMapRedWork(w, out); HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toString()); @@ -1324,4 +1335,58 @@ } } + /** + * Calculate the total size of input files. + * + * @param job the hadoop job conf. + * @param work map reduce job plan + * @param filter filter to apply to the input paths before calculating size + * @return the summary of all the input paths. + * @throws IOException + */ + public static ContentSummary getInputSummary + (Context ctx, MapredWork work, PathFilter filter) throws IOException { + + long[] summary = {0, 0, 0}; + + // For each input path, calculate the total size. + for (String path : work.getPathToAliases().keySet()) { + try { + Path p = new Path(path); + + if(filter != null && !filter.accept(p)) + continue; + + ContentSummary cs = ctx.getCS(path); + if (cs == null) { + FileSystem fs = p.getFileSystem(ctx.getConf()); + cs = fs.getContentSummary(p); + ctx.addCS(path, cs); + } + + summary[0] += cs.getLength(); + summary[1] += cs.getFileCount(); + summary[2] += cs.getDirectoryCount(); + + } catch (IOException e) { + LOG.info("Cannot get size of " + path + ". Safely ignored."); + if (path != null) + ctx.addCS(path, new ContentSummary(0, 0, 0)); + } + } + return new ContentSummary(summary[0], summary[1], summary[2]); + } + + public static boolean isEmptyPath(JobConf job, String path) throws Exception { + Path dirPath = new Path(path); + FileSystem inpFs = dirPath.getFileSystem(job); + + if (inpFs.exists(dirPath)) { + FileStatus[] fStats = inpFs.listStatus(dirPath); + if (fStats.length > 0) { + return false; + } + } + return true; + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (working copy) @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -2124,4 +2125,8 @@ return "DDL"; } + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + // no-op + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.util.StringUtils; /** @@ -277,4 +278,10 @@ public String getName() { return "MOVE"; } + + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + // no-op + } } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CopyTask.java (working copy) @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.api.StageType; @@ -97,4 +98,12 @@ public String getName() { return "COPY"; } + + @Override + protected void localizeMRTmpFilesImpl(Context ctx) { + // copy task is only used by the load command and + // does not use any map-reduce tmp files + // we don't expect to enter this code path at all + throw new RuntimeException ("Unexpected call"); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (working copy) @@ -321,5 +321,4 @@ public void setInputformat(String inputformat) { this.inputformat = inputformat; } - } Index: ql/src/java/org/apache/hadoop/hive/ql/Context.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Context.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/Context.java (working copy) @@ -23,8 +23,11 @@ import java.io.IOException; import java.net.URI; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Random; @@ -34,9 +37,11 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.conf.Configuration; /** * Context for Semantic Analyzers. Usage: not reusable - construct a new one for @@ -50,38 +55,27 @@ private Path[] resDirPaths; private int resDirFilesNum; boolean initialized; + String originalTracker = null; + private HashMap pathToCS; - // all query specific directories are created as sub-directories of queryPath - // this applies to all non-local (ie. hdfs) file system tmp folders - private Path queryScratchPath; + // scratch path to use for all non-local (ie. hdfs) file system tmp folders + private final Path nonLocalScratchPath; + // scratch directory to use for local file system tmp folders + private final String localScratchDir; - // Path without a file system - // Used for creating temporary directory on local file system - private String localScratchPath; + // Keeps track of scratch directories created for different scheme/authority + private final Map fsScratchDirs = new HashMap(); - // Fully Qualified path on the local file system - // System.getProperty("java.io.tmpdir") + Path.SEPARATOR - // + System.getProperty("user.name") + Path.SEPARATOR + executionId - private Path localScratchDir; - - // On the default FileSystem (usually HDFS): - // also based on hive.exec.scratchdir which by default is - // "/tmp/"+System.getProperty("user.name")+"/hive" - private Path MRScratchDir; - - // Keeps track of scratch directories created for different scheme/authority - private final Map externalScratchDirs = new HashMap(); - - private HiveConf conf; + private Configuration conf; protected int pathid = 10000; protected boolean explain = false; private TokenRewriteStream tokenRewriteStream; String executionId; - public Context(HiveConf conf) throws IOException { + public Context(Configuration conf) throws IOException { this(conf, generateExecutionId()); } @@ -89,127 +83,104 @@ * Create a Context with a given executionId. ExecutionId, together with * user name and conf, will determine the temporary directory locations. */ - public Context(HiveConf conf, String executionId) throws IOException { + public Context(Configuration conf, String executionId) { this.conf = conf; this.executionId = executionId; + + // non-local tmp location is configurable. however it is the same across + // all external file systems + nonLocalScratchPath = + new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR), + executionId); - localScratchPath = System.getProperty("java.io.tmpdir") + // local tmp location is not configurable for now + localScratchDir = System.getProperty("java.io.tmpdir") + Path.SEPARATOR + System.getProperty("user.name") + Path.SEPARATOR + executionId; - - queryScratchPath = new Path(conf.getVar(HiveConf.ConfVars.SCRATCHDIR), executionId); } /** * Set the context on whether the current query is an explain query. - * - * @param value - * true if the query is an explain query, false if not + * @param value true if the query is an explain query, false if not */ public void setExplain(boolean value) { explain = value; } - + /** - * Find out whether the current query is an explain query. - * + * Find whether the current query is an explain query * @return true if the query is an explain query, false if not */ - public boolean getExplain() { + public boolean getExplain () { return explain; } + /** - * Make a tmp directory for MR intermediate data If URI/Scheme are not - * supplied - those implied by the default filesystem will be used (which will - * typically correspond to hdfs instance on hadoop cluster). + * Get a tmp directory on specified URI * - * @param mkdir if true, will make the directory. Will throw IOException if that fails. + * @param scheme Scheme of the target FS + * @param authority Authority of the target FS + * @param mkdir create the directory if true + * @param scratchdir path of tmp directory */ - private Path makeMRScratchDir(HiveConf conf, boolean mkdir) - throws IOException { + private String getScratchDir(String scheme, String authority, + boolean mkdir, String scratchDir) { - Path dir = FileUtils.makeQualified(queryScratchPath, conf); + String fileSystem = scheme + ":" + authority; + String dir = fsScratchDirs.get(fileSystem); - if (mkdir) { - FileSystem fs = dir.getFileSystem(conf); - if (!fs.mkdirs(dir)) { - throw new IOException("Cannot make directory: " + dir); + if (dir == null) { + Path dirPath = new Path(scheme, authority, scratchDir); + if (mkdir) { + try { + FileSystem fs = dirPath.getFileSystem(conf); + if (!fs.mkdirs(dirPath)) + throw new RuntimeException("Cannot make directory: " + + dirPath.toString()); + } catch (IOException e) { + throw new RuntimeException (e); + } } + dir = dirPath.toString(); + fsScratchDirs.put(fileSystem, dir); } return dir; } - /** - * Make a tmp directory on specified URI Currently will use the same path as - * implied by SCRATCHDIR config variable. - */ - private Path makeExternalScratchDir(HiveConf conf, boolean mkdir, URI extURI) - throws IOException { - Path dir = new Path(extURI.getScheme(), extURI.getAuthority(), - queryScratchPath.toUri().getPath()); - - if (mkdir) { - FileSystem fs = dir.getFileSystem(conf); - if (!fs.mkdirs(dir)) { - throw new IOException("Cannot make directory: " + dir); - } - } - return dir; - } - /** - * Make a tmp directory for local file system. - * - * @param mkdir if true, will make the directory. Will throw IOException if that fails. + * Create a local scratch directory on demand and return it. */ - private Path makeLocalScratchDir(boolean mkdir) - throws IOException { - - FileSystem fs = FileSystem.getLocal(conf); - Path dir = fs.makeQualified(new Path(localScratchPath)); - - if (mkdir) { - if (!fs.mkdirs(dir)) { - throw new IOException("Cannot make directory: " + dir); - } - } - return dir; - } - - /** - * Get a tmp directory on specified URI Will check if this has already been - * made (either via MR or Local FileSystem or some other external URI. - */ - private String getExternalScratchDir(URI extURI) { + public String getLocalScratchDir(boolean mkdir) { try { - String fileSystem = extURI.getScheme() + ":" + extURI.getAuthority(); - Path dir = externalScratchDirs.get(fileSystem); - if (dir == null) { - dir = makeExternalScratchDir(conf, !explain, extURI); - externalScratchDirs.put(fileSystem, dir); - } - return dir.toString(); + FileSystem fs = FileSystem.getLocal(conf); + URI uri = fs.getUri(); + return getScratchDir(uri.getScheme(), uri.getAuthority(), + mkdir, localScratchDir); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException (e); } } + /** * Create a map-reduce scratch directory on demand and return it. + * */ public String getMRScratchDir() { + + // if we are executing entirely on the client side - then + // just (re)use the local scratch directory + if(isLocalOnlyExecutionMode()) + return getLocalScratchDir(!explain); + try { - // if we are executing entirely on the client side - then - // just (re)use the local scratch directory - if(isLocalOnlyExecutionMode()) - return getLocalScratchDir(); + Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf); + URI uri = dir.toUri(); + return getScratchDir(uri.getScheme(), uri.getAuthority(), + !explain, uri.getPath()); - if (MRScratchDir == null) { - MRScratchDir = makeMRScratchDir(conf, !explain); - } - return MRScratchDir.toString(); } catch (IOException e) { throw new RuntimeException(e); } catch (IllegalArgumentException e) { @@ -218,87 +189,89 @@ } } - /** - * Create a local scratch directory on demand and return it. - */ - public String getLocalScratchDir() { - try { - if (localScratchDir == null) { - localScratchDir = makeLocalScratchDir(true); - } - return localScratchDir.toString(); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - throw new RuntimeException("Error while making local scratch " - + "directory - check filesystem config (" + e.getCause() + ")", e); - } + private String getExternalScratchDir(URI extURI) { + return getScratchDir(extURI.getScheme(), extURI.getAuthority(), + !explain, nonLocalScratchPath.toUri().getPath()); } - private void removeDir(Path p) { - try { - p.getFileSystem(conf).delete(p, true); - } catch (Exception e) { - LOG.warn("Error Removing Scratch: " - + StringUtils.stringifyException(e)); - } - } - /** * Remove any created scratch directories. */ private void removeScratchDir() { - - for (Map.Entry p : externalScratchDirs.entrySet()) { - removeDir(p.getValue()); + for (Map.Entry entry : fsScratchDirs.entrySet()) { + try { + Path p = new Path(entry.getValue()); + p.getFileSystem(conf).delete(p, true); + } catch (Exception e) { + LOG.warn("Error Removing Scratch: " + + StringUtils.stringifyException(e)); + } } - externalScratchDirs.clear(); - - if (MRScratchDir != null) { - removeDir(MRScratchDir); - MRScratchDir = null; - } - - if (localScratchDir != null) { - removeDir(localScratchDir); - localScratchDir = null; - } + fsScratchDirs.clear(); } - /** - * Return the next available path in the current scratch dir. - */ - private String nextPath(String base) { - return base + Path.SEPARATOR + Integer.toString(pathid++); + private String nextPathId() { + return Integer.toString(pathid++); } + + private static final String MR_PREFIX = "-mr-"; + private static final String EXT_PREFIX = "-ext-"; + private static final String LOCAL_PREFIX = "-local-"; + /** - * Check if path is tmp path. the assumption is that all uri's relative to - * scratchdir are temporary. - * + * Check if path is for intermediate data * @return true if a uri is a temporary uri for map-reduce intermediate data, * false otherwise */ public boolean isMRTmpFileURI(String uriStr) { - return (uriStr.indexOf(executionId) != -1); + return (uriStr.indexOf(executionId) != -1) && + (uriStr.indexOf(MR_PREFIX) != -1); } /** * Get a path to store map-reduce intermediate data in. - * + * * @return next available path for map-red intermediate data */ public String getMRTmpFileURI() { - return nextPath(getMRScratchDir()); + return getMRScratchDir() + Path.SEPARATOR + MR_PREFIX + + nextPathId(); } + /** + * Given a URI for mapreduce intermediate output, swizzle the + * it to point to the local file system. This can be called in + * case the caller decides to run in local mode (in which case + * all intermediate data can be stored locally) + * + * @param originalURI uri to localize + * @return localized path for map-red intermediate data + */ + public String localizeMRTmpFileURI(String originalURI) { + Path o = new Path(originalURI); + Path mrbase = new Path(getMRScratchDir()); + + URI relURI = mrbase.toUri().relativize(o.toUri()); + if (relURI.equals(o.toUri())) + throw new RuntimeException + ("Invalid URI: " + originalURI + ", cannot relativize against" + + mrbase.toString()); + + return getLocalScratchDir(!explain) + Path.SEPARATOR + + relURI.getPath(); + } + + + /** * Get a tmp path on local host to store intermediate data. * * @return next available tmp path on local fs */ public String getLocalTmpFileURI() { - return nextPath(getLocalScratchDir()); + return getLocalScratchDir(true) + Path.SEPARATOR + LOCAL_PREFIX + + nextPathId(); } /** @@ -309,7 +282,8 @@ * @return next available tmp path on the file system corresponding extURI */ public String getExternalTmpFileURI(URI extURI) { - return nextPath(getExternalScratchDir(extURI)); + return getExternalScratchDir(extURI) + Path.SEPARATOR + EXT_PREFIX + + nextPathId(); } /** @@ -368,6 +342,7 @@ } } removeScratchDir(); + originalTracker = null; } public DataInput getStream() { @@ -473,10 +448,6 @@ return executionId; } - public Path getQueryPath() { - return queryScratchPath; - } - /** * Does Hive wants to run tasks entirely on the local machine * (where the query is being compiled)? @@ -484,6 +455,66 @@ * Today this translates into running hadoop jobs locally */ public boolean isLocalOnlyExecutionMode() { - return conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local"); + return HiveConf.getVar(conf, HiveConf.ConfVars.HADOOPJT).equals("local"); } + + public void setOriginalTracker(String originalTracker) { + this.originalTracker = originalTracker; + } + + public void restoreOriginalTracker() { + if (originalTracker != null) { + HiveConf.setVar(conf, HiveConf.ConfVars.HADOOPJT, originalTracker); + originalTracker = null; + } + } + + public void addCS(String path, ContentSummary cs) { + if(pathToCS == null) + pathToCS = new HashMap (); + pathToCS.put(path, cs); + } + + public ContentSummary getCS(String path) { + if(pathToCS == null) + pathToCS = new HashMap (); + return pathToCS.get(path); + } + + public Configuration getConf() { + return conf; + } + + + /** + * Given a mapping from paths to objects, localize any MR tmp paths + * @param map mapping from paths to objects + */ + public void localizeKeys(Map map) { + for (Map.Entry entry: map.entrySet()) { + String path = entry.getKey(); + if (isMRTmpFileURI(path)) { + Object val = entry.getValue(); + map.remove(path); + map.put(localizeMRTmpFileURI(path), val); + } + } + } + + /** + * Given a list of paths, localize any MR tmp paths contained therein + * @param paths list of paths to be localized + */ + public void localizePaths(List paths) { + Iterator iter = paths.iterator(); + List toAdd = new ArrayList (); + while(iter.hasNext()) { + String path = iter.next(); + if (isMRTmpFileURI(path)) { + iter.remove(); + toAdd.add(localizeMRTmpFileURI(path)); + } + } + paths.addAll(toAdd); + } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (working copy) @@ -134,8 +134,9 @@ } RCFileOutputFormat.setColumnNumber(jc, cols.length); - final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc, FileSystem - .get(jc), finalOutPath, isCompressed); + final RCFile.Writer outWriter = Utilities.createRCFileWriter + (jc, finalOutPath.getFileSystem(jc), + finalOutPath, isCompressed); return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { public void write(Writable r) throws IOException { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -24,6 +24,7 @@ import static org.apache.hadoop.util.StringUtils.stringifyException; import java.io.Serializable; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -39,22 +40,26 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.GroupByOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.ExecDriver; import org.apache.hadoop.hive.ql.exec.MapRedTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -803,14 +808,26 @@ String fname = stripQuotes(ast.getChild(0).getText()); if ((!qb.getParseInfo().getIsSubQ()) && (((ASTNode) ast.getChild(0)).getToken().getType() == HiveParser.TOK_TMP_FILE)) { - fname = ctx.getMRTmpFileURI(); - ctx.setResDir(new Path(fname)); if (qb.isCTAS()) { qb.setIsQuery(false); + + // allocate a temporary output dir on the location of the table + String location = conf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); + try { + fname = ctx.getExternalTmpFileURI + (FileUtils.makeQualified(new Path(location), conf).toUri()); + + } catch (Exception e) { + throw new SemanticException("Error creating temporary folder on: " + + location, e); + } + } else { qb.setIsQuery(true); + fname = ctx.getMRTmpFileURI(); } + ctx.setResDir(new Path(fname)); } qb.getMetaData().setDestForAlias(name, fname, (ast.getToken().getType() == HiveParser.TOK_DIR)); @@ -1172,8 +1189,9 @@ new FilterDesc(genExprNodeDesc(condn, inputRR), false), new RowSchema( inputRR.getColumnInfos()), input), inputRR); - LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: " - + inputRR.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: " + + inputRR.toString()); return output; } @@ -1670,15 +1688,20 @@ ASTNode selExprList = qb.getParseInfo().getSelForClause(dest); Operator op = genSelectPlan(selExprList, qb, input); - LOG.debug("Created Select Plan for clause: " + dest); + + if (LOG.isDebugEnabled()) + LOG.debug("Created Select Plan for clause: " + dest); + return op; } @SuppressWarnings("nls") private Operator genSelectPlan(ASTNode selExprList, QB qb, Operator input) throws SemanticException { - LOG.debug("tree: " + selExprList.toStringTree()); + if (LOG.isDebugEnabled()) + LOG.debug("tree: " + selExprList.toStringTree()); + ArrayList col_list = new ArrayList(); RowResolver out_rwsch = new RowResolver(); ASTNode trfm = null; @@ -1753,8 +1776,10 @@ assert (false); } } - LOG.debug("UDTF table alias is " + udtfTableAlias); - LOG.debug("UDTF col aliases are " + udtfColAliases); + if (LOG.isDebugEnabled()) { + LOG.debug("UDTF table alias is " + udtfTableAlias); + LOG.debug("UDTF col aliases are " + udtfColAliases); + } } // The list of expressions after SELECT or SELECT TRANSFORM. @@ -1767,7 +1792,8 @@ exprList = selExprList; } - LOG.debug("genSelectPlan: input = " + inputRR.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("genSelectPlan: input = " + inputRR.toString()); // For UDTF's, skip the function name to get the expressions int startPosn = isUDTF ? posn + 1 : posn; @@ -1877,7 +1903,8 @@ output = genUDTFPlan(genericUDTF, udtfTableAlias, udtfColAliases, qb, output); } - LOG.debug("Created Select Plan row schema: " + out_rwsch.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created Select Plan row schema: " + out_rwsch.toString()); return output; } @@ -3472,8 +3499,9 @@ .mapDirToFop(ltd.getSourceDir(), (FileSinkOperator)output); } - LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " - + dest_path + " row schema: " + inputRR.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " + + dest_path + " row schema: " + inputRR.toString()); return output; } @@ -3626,8 +3654,9 @@ new LimitDesc(limit), new RowSchema(inputRR.getColumnInfos()), input), inputRR); - LOG.debug("Created LimitOperator Plan for clause: " + dest - + " row schema: " + inputRR.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created LimitOperator Plan for clause: " + dest + + " row schema: " + inputRR.toString()); return limitMap; } @@ -3654,8 +3683,9 @@ throw new SemanticException(ErrorMsg.UDTF_LATERAL_VIEW.getMsg()); } - LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " - + colAliases); + if (LOG.isDebugEnabled()) + LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " + + colAliases); // Use the RowResolver from the input operator to generate a input // ObjectInspector that can be used to initialize the UDTF. Then, the @@ -3886,8 +3916,9 @@ Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( out_rwsch.getColumnInfos()), interim), out_rwsch); - LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() + " row schema: " - + out_rwsch.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created ReduceSink Plan for table: " + tab.getTableName() + + " row schema: " + out_rwsch.toString()); return output; } @@ -3996,8 +4027,9 @@ Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema( out_rwsch.getColumnInfos()), interim), out_rwsch); - LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: " - + out_rwsch.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: " + + out_rwsch.toString()); return output; } @@ -5110,7 +5142,9 @@ } } - LOG.debug("Created Body Plan for Query Block " + qb.getId()); + if (LOG.isDebugEnabled()) + LOG.debug("Created Body Plan for Query Block " + qb.getId()); + return curr; } @@ -5495,8 +5529,10 @@ } Operator output = putOpInsertMap(tableOp, rwsch); - LOG.debug("Created Table Plan for " + alias + " " + tableOp.toString()); + if (LOG.isDebugEnabled()) + LOG.debug("Created Table Plan for " + alias + " " + tableOp.toString()); + return output; } @@ -5562,8 +5598,10 @@ } Operator bodyOpInfo = genBodyPlan(qb, srcOpInfo); - LOG.debug("Created Plan for Query Block " + qb.getId()); + if (LOG.isDebugEnabled()) + LOG.debug("Created Plan for Query Block " + qb.getId()); + this.qb = qb; return bodyOpInfo; } @@ -5873,6 +5911,8 @@ } } + decideExecMode(rootTasks, ctx); + if (qb.isCTAS()) { // generate a DDL task and make it a dependent task of the leaf CreateTableDesc crtTblDesc = qb.getTableDesc(); @@ -5922,7 +5962,7 @@ // loop over all the tasks recursviely private void generateCountersTask(Task task) { - if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) { + if (task instanceof ExecDriver) { HashMap> opMap = ((MapredWork) task .getWork()).getAliasToWork(); if (!opMap.isEmpty()) { @@ -5972,7 +6012,7 @@ // loop over all the tasks recursviely private void breakTaskTree(Task task) { - if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) { + if (task instanceof ExecDriver) { HashMap> opMap = ((MapredWork) task .getWork()).getAliasToWork(); if (!opMap.isEmpty()) { @@ -6015,7 +6055,7 @@ // loop over all the tasks recursviely private void setKeyDescTaskTree(Task task) { - if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) { + if (task instanceof ExecDriver) { MapredWork work = (MapredWork) task.getWork(); work.deriveExplainAttributes(); HashMap> opMap = work @@ -6353,8 +6393,6 @@ @Override public void validate() throws SemanticException { - // Check if the plan contains atleast one path. - // validate all tasks for (Task rootTask : rootTasks) { validate(rootTask); @@ -6363,13 +6401,6 @@ private void validate(Task task) throws SemanticException { - if ((task instanceof MapRedTask) || (task instanceof ExecDriver)) { - task.getWork(); - - // If the plan does not contain any path, an empty file - // will be added by ExecDriver at execute time - } - if (task.getChildTasks() == null) { return; } @@ -6813,4 +6844,81 @@ } } } + + private void decideExecMode(List> rootTasks, Context ctx) + throws SemanticException { + + // bypass for explain queries for now + if (ctx.getExplain()) + return; + + // user has told us to run in local mode or doesn't want auto-local mode + if (ctx.isLocalOnlyExecutionMode() || + !conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) + return; + + final Context lCtx = ctx; + PathFilter p = new PathFilter () { + public boolean accept(Path file) { + return !lCtx.isMRTmpFileURI(file.toUri().getPath()); + } + }; + List mrtasks = Driver.getMRTasks(rootTasks); + + // map-reduce jobs will be run locally based on data size + // first find out if any of the jobs needs to run non-locally + boolean hasNonLocalJob = false; + for (ExecDriver mrtask: mrtasks) { + try { + ContentSummary inputSummary = Utilities.getInputSummary + (ctx, (MapredWork)mrtask.getWork(), p); + int numReducers = getNumberOfReducers(mrtask.getWork(), conf); + + if (LOG.isDebugEnabled()) { + LOG.debug("Task: " + mrtask.getId() + ", Summary: " + + inputSummary.getLength() + "," + inputSummary.getFileCount() + "," + + numReducers); + } + + if(!MapRedTask.isEligibleForLocalMode(conf, inputSummary, numReducers)) { + hasNonLocalJob = true; + break; + } + } catch (IOException e) { + throw new SemanticException (e); + } + } + + if(!hasNonLocalJob) { + // none of the mapred tasks needs to be run locally. That means that the + // query can be executed entirely in local mode. Save the current tracker + // value and restore it when done + ctx.setOriginalTracker(conf.getVar(HiveConf.ConfVars.HADOOPJT)); + conf.setVar(HiveConf.ConfVars.HADOOPJT, "local"); + LOG.info("Selecting local only mode for query"); + + // If all the tasks can be run locally, we can use local disk for + // storing intermediate data. + + /** + * This code is commented out pending further testing/development + * for (Task t: rootTasks) + * t.localizeMRTmpFiles(ctx); + */ + } + } + + /** + * Make a best guess at trying to find the number of reducers + */ + private static int getNumberOfReducers(MapredWork mrwork, HiveConf conf) { + if (mrwork.getReducer() == null) + return 0; + + if (mrwork.getNumReduceTasks() >= 0) + return mrwork.getNumReduceTasks(); + + return conf.getIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS); + } + } Index: ql/src/java/org/apache/hadoop/hive/ql/Driver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/Driver.java (revision 961525) +++ ql/src/java/org/apache/hadoop/hive/ql/Driver.java (working copy) @@ -97,32 +97,29 @@ Operator.resetId(); } - public int countJobs(List> tasks) { - return countJobs(tasks, new ArrayList>()); + public static int countJobs(List> tasks) { + return getMRTasks(tasks).size(); } - public int countJobs(List> tasks, - List> seenTasks) { - if (tasks == null) { - return 0; - } - int jobs = 0; + public static List getMRTasks (List> tasks) { + List mrTasks = new ArrayList (); + if(tasks != null) + getMRTasks(tasks, mrTasks); + return mrTasks; + } + + private static void getMRTasks (List> tasks, + List mrTasks) { for (Task task : tasks) { - if (!seenTasks.contains(task)) { - seenTasks.add(task); + if (task instanceof ExecDriver && !mrTasks.contains((ExecDriver)task)) + mrTasks.add((ExecDriver)task); - if (task instanceof ConditionalTask) { - jobs += countJobs(((ConditionalTask) task).getListTasks(), seenTasks); - } else if (task.isMapRedTask()) { // this may be true for conditional - // task, but we will not inc the - // counter - jobs++; - } + if (task instanceof ConditionalTask) + getMRTasks(((ConditionalTask)task).getListTasks(), mrTasks); - jobs += countJobs(task.getChildTasks(), seenTasks); - } + if (task.getChildTasks() != null) + getMRTasks(task.getChildTasks(), mrTasks); } - return jobs; } /** @@ -319,7 +316,7 @@ // test Only - serialize the query plan and deserialize it if("true".equalsIgnoreCase(System.getProperty("test.serialize.qplan"))) { - String queryPlanFileName = ctx.getLocalScratchDir() + Path.SEPARATOR_CHAR + String queryPlanFileName = ctx.getLocalScratchDir(true) + Path.SEPARATOR_CHAR + "queryplan.xml"; LOG.info("query plan = " + queryPlanFileName); queryPlanFileName = new Path(queryPlanFileName).toUri().getPath(); @@ -539,6 +536,10 @@ } } + // in case we decided to run everything in local mode, restore the + // the jobtracker setting to its initial value + ctx.restoreOriginalTracker(); + // Get all the post execution hooks and execute them. for (PostExecute peh : getPostExecHooks()) { peh.run(SessionState.get(), plan.getInputs(), plan.getOutputs(),