Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.6.0
    • Component/s: Query Processor
    • Labels:
    • Hadoop Flags:
      Reviewed

      Description

      Hive already have support for map-join. Map-join treats the big table as job input, and in each mapper, it loads all data from a small table.

      In case the big table is already bucketed on the join key, we don't have to load the whole small table in each of the mappers. This will greatly alleviate the memory pressure, and make map-join work with medium-sized tables.

      There are 4 steps we can improve:

      S0. This is what the user can already do now: create a new bucketed table and insert all data from the small table to it; Submit BUCKETNUM jobs, each doing a map-side join of "bigtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)" with "smallbucketedtable TABLEPARTITION(BUCKET i OUT OF NBUCKETS)".

      S1. Change the code so that when map-join is loading the small table, we automatically drop the rows with the keys that are NOT in the same bucket as the big table. This should alleviate the problem on memory, but we might still have thousands of mappers reading the whole of the small table.

      S2. Let's say the user already bucketed the small table on the join key into exactly the same number of buckets (or a factor of the buckets of the big table), then map-join can choose to load only the buckets that are useful.

      S3. Add a new hint (e.g. /*+ MAPBUCKETJOIN(a) */), so that Hive automatically does S2, without the need of asking the user to create temporary bucketed table for the small table.

      1. hive-917-2010-2-15.patch
        117 kB
        He Yongqiang
      2. hive-917-2010-2-16.patch
        219 kB
        He Yongqiang
      3. hive-917-2010-2-3.patch
        55 kB
        He Yongqiang
      4. hive-917-2010-2-8.patch
        117 kB
        He Yongqiang

        Issue Links

          Activity

          Hide
          Namit Jain added a comment -

          Generalizing it more, we can use bucketing/sorting information to do lot more.
          This can be the parent jira out of which we can have various dependent jiras.
          I will file one for group by

          Show
          Namit Jain added a comment - Generalizing it more, we can use bucketing/sorting information to do lot more. This can be the parent jira out of which we can have various dependent jiras. I will file one for group by
          Hide
          Namit Jain added a comment -

          Started looking at the patch:

          1. Can you change the test to add a explain for all queries ?
          2. Can you change the default value of hive.optimize.bucketmapjoin to true ?
          3. Instead of changing QTestUtil.java, can you create the tables/partitions you need in the new test only ?

          Show
          Namit Jain added a comment - Started looking at the patch: 1. Can you change the test to add a explain for all queries ? 2. Can you change the default value of hive.optimize.bucketmapjoin to true ? 3. Instead of changing QTestUtil.java, can you create the tables/partitions you need in the new test only ?
          Hide
          Namit Jain added a comment -

          BucketMapJoinOptimizer.java: 80
          wrong comment:
          // process group-by pattern

          can you add a correct comment ?

          // mapper. That means there is not reducer between the root table scan and

          change to:

          // mapper. That means there is no reducer between the root table scan and

          Add some comments in
          private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc mjDesc, int index) {

          A mapjoin B where A is the big table and partitioned should be optimized
          B is not partitioned
          (assuming both A and B are bucketed)

          if(partNumber == 0) {
          Integer num = new Integer(0);
          bucketNumbers.add(num);
          aliasToBucketNumber.put(alias, num);
          aliasToBucketFileNames.put(alias, new ArrayList<String>());

          no need to do this - anyway, the results are empty

          ExecMapper:

          if(bucketMatcherCls == null)

          { bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class; }

          Add the class name in mapredlocalwork and initialize it using reflection
          Keep file name to file name mapping in mapredlocalwork (only useful for bucketed map join - not for skew join)

          MapredLocalWork:

          private LinkedHashMap<String, Integer> aliasToBucketNumber;
          private LinkedHashMap<String, List<String>> aliasToBucketFileNames;
          private String mapJoinBigTableAlias;
          private Class<? extends BucketMatcher> bucketMatcker;

          create a new class for the above

          public Class<? extends BucketMatcher> getBucketMatcker()

          { return bucketMatcker; }

          public void setBucketMatcker(Class<? extends BucketMatcher> bucketMatcker)

          { this.bucketMatcker = bucketMatcker; }

          spelling: should be Matcher

          DefaultBucketMatcher:

          public List<Path> getAliasBucketFiles(String refTableInputFile, String refTableAlias, String alias) {
          int bigTblBucketNum = aliasToBucketNumber.get(refTableAlias);
          int smallTblBucketNum = aliasToBucketNumber.get(alias);
          Collections.sort(aliasToBucketFileNames.get(refTableAlias));
          Collections.sort(aliasToBucketFileNames.get(alias));

          List<Path> resultFileNames = new ArrayList<Path>();
          if (bigTblBucketNum >= smallTblBucketNum) {
          int temp = bigTblBucketNum / smallTblBucketNum;
          int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
          int toAddSmallIndex = index/temp;
          if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size())

          { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex))); }

          } else {
          int jump = smallTblBucketNum / bigTblBucketNum;
          int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile);
          for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) {
          if(i <= aliasToBucketFileNames.get(alias).size())

          { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(i))); }

          }
          }
          return resultFileNames;
          }

          move this to compile time and add some more comments

          FetchOperator.java:

          6 boolean ret = false;
          267 try

          { 268 value = currRecReader.createValue(); 269 ret = currRecReader.next(key, value); 270 }

          catch (Exception e)

          { 271 e.printStackTrace(); 272 }
          Show
          Namit Jain added a comment - BucketMapJoinOptimizer.java: 80 wrong comment: // process group-by pattern can you add a correct comment ? // mapper. That means there is not reducer between the root table scan and change to: // mapper. That means there is no reducer between the root table scan and Add some comments in private boolean checkBucketColumns(List<String> bucketColumns, MapJoinDesc mjDesc, int index) { A mapjoin B where A is the big table and partitioned should be optimized B is not partitioned (assuming both A and B are bucketed) if(partNumber == 0) { Integer num = new Integer(0); bucketNumbers.add(num); aliasToBucketNumber.put(alias, num); aliasToBucketFileNames.put(alias, new ArrayList<String>()); no need to do this - anyway, the results are empty ExecMapper: if(bucketMatcherCls == null) { bucketMatcherCls = org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class; } Add the class name in mapredlocalwork and initialize it using reflection Keep file name to file name mapping in mapredlocalwork (only useful for bucketed map join - not for skew join) MapredLocalWork: private LinkedHashMap<String, Integer> aliasToBucketNumber; private LinkedHashMap<String, List<String>> aliasToBucketFileNames; private String mapJoinBigTableAlias; private Class<? extends BucketMatcher> bucketMatcker; create a new class for the above public Class<? extends BucketMatcher> getBucketMatcker() { return bucketMatcker; } public void setBucketMatcker(Class<? extends BucketMatcher> bucketMatcker) { this.bucketMatcker = bucketMatcker; } spelling: should be Matcher DefaultBucketMatcher: public List<Path> getAliasBucketFiles(String refTableInputFile, String refTableAlias, String alias) { int bigTblBucketNum = aliasToBucketNumber.get(refTableAlias); int smallTblBucketNum = aliasToBucketNumber.get(alias); Collections.sort(aliasToBucketFileNames.get(refTableAlias)); Collections.sort(aliasToBucketFileNames.get(alias)); List<Path> resultFileNames = new ArrayList<Path>(); if (bigTblBucketNum >= smallTblBucketNum) { int temp = bigTblBucketNum / smallTblBucketNum; int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); int toAddSmallIndex = index/temp; if(toAddSmallIndex < aliasToBucketFileNames.get(alias).size()) { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(toAddSmallIndex))); } } else { int jump = smallTblBucketNum / bigTblBucketNum; int index = aliasToBucketFileNames.get(refTableAlias).indexOf(refTableInputFile); for (int i = index; i < aliasToBucketFileNames.get(alias).size(); i = i + jump) { if(i <= aliasToBucketFileNames.get(alias).size()) { resultFileNames.add(new Path(aliasToBucketFileNames.get(alias).get(i))); } } } return resultFileNames; } move this to compile time and add some more comments FetchOperator.java: 6 boolean ret = false; 267 try { 268 value = currRecReader.createValue(); 269 ret = currRecReader.next(key, value); 270 } catch (Exception e) { 271 e.printStackTrace(); 272 }
          Hide
          Namit Jain added a comment -

          reviewed with Yongqiang

          Show
          Namit Jain added a comment - reviewed with Yongqiang
          Hide
          Namit Jain added a comment -

          Also, filing a new jira for supporting bucketing map-joins where more than 1 partition is being joined for the
          big table.

          Show
          Namit Jain added a comment - Also, filing a new jira for supporting bucketing map-joins where more than 1 partition is being joined for the big table.
          Hide
          He Yongqiang added a comment -

          A new patch integrated with Namit's comments. Thanks Namit.

          Show
          He Yongqiang added a comment - A new patch integrated with Namit's comments. Thanks Namit.
          Hide
          Namit Jain added a comment -

          Can you generate the patch again - it is not applying cleanly

          Show
          Namit Jain added a comment - Can you generate the patch again - it is not applying cleanly
          Hide
          He Yongqiang added a comment -

          Regenerated the patch against trunk. see hive-917...2-15.patch

          Show
          He Yongqiang added a comment - Regenerated the patch against trunk. see hive-917...2-15.patch
          Hide
          Namit Jain added a comment -

          Can you add explain extended to the tests so that it is possible to see which buckets are joining with which buckets ?

          Show
          Namit Jain added a comment - Can you add explain extended to the tests so that it is possible to see which buckets are joining with which buckets ?
          Hide
          He Yongqiang added a comment -

          A new patch:
          1) added explain extended
          2) break the test to 4 tests.

          Show
          He Yongqiang added a comment - A new patch: 1) added explain extended 2) break the test to 4 tests.
          Hide
          Namit Jain added a comment -

          Added some more tasks in the follow-up jira after talking to Yongqiang.
          Will commit this if the tests pass

          Show
          Namit Jain added a comment - Added some more tasks in the follow-up jira after talking to Yongqiang. Will commit this if the tests pass
          Hide
          Namit Jain added a comment -

          Committed. Thanks Yongqiang

          Show
          Namit Jain added a comment - Committed. Thanks Yongqiang

            People

            • Assignee:
              He Yongqiang
              Reporter:
              Zheng Shao
            • Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development