Hive
  1. Hive
  2. HIVE-1642

Convert join queries to map-join based on size of table/row

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.0
    • Component/s: Query Processor
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      Based on the number of rows and size of each table, Hive should automatically be able to convert a join into map-join.

      1. hive_1642_1.patch
        434 kB
        Liyin Tang
      2. hive_1642_2.patch
        770 kB
        Liyin Tang
      3. hive_1642_4.patch
        769 kB
        Liyin Tang
      4. hive-1642_10.patch
        767 kB
        Liyin Tang
      5. hive-1642_11.patch
        767 kB
        Liyin Tang
      6. hive-1642_5.patch
        769 kB
        Liyin Tang
      7. hive-1642_6.patch
        767 kB
        Liyin Tang
      8. hive-1642_7.patch
        767 kB
        Liyin Tang
      9. hive-1642_9.patch
        767 kB
        Liyin Tang

        Activity

        Hide
        Liyin Tang added a comment -

        Are we talking about dynamically changing execution plan based on the table size?

        Show
        Liyin Tang added a comment - Are we talking about dynamically changing execution plan based on the table size?
        Hide
        Namit Jain added a comment -

        yes

        Show
        Namit Jain added a comment - yes
        Hide
        Liyin Tang added a comment -

        I just finished converting common join into map join based on the file size. There are 2 flags to control this optimization.
        1) set hive.auto.convert.join = true; It means this optimization is enabled. By default right now, this flag is disabled in order not to break any existing test cases. Also I put 25 additional test cases, auto_join0.q - auto_join25.q, which covers this optimization code.
        2) Set hive.hashtable.max.memory.usage = 0.9; It means if the memory usage of local task is more than 90% of its heap size, then the local task will abort by itself. The Driver will know the local work fails and it won't submit the MapJoinTask (a Map Only MapRedTask) to Hadoop, but instead, it will submit the originally CommonJoinTask to Hadoop to run.
        3) Set hive.smalltable.filesize = 25000000L; It means if the summary of the small table file size is less than 25M, then it will run the map join task. If not, just run the originally common join task.
        The following is the basic flow how it works. For each common join, create a conditional task.
        1) For each join table, generate a mapjoin task by assuming this table is big table.
        a. The left side of right outer join must be small table.
        b. The right side of left outer join must be small table.
        c. No full outer join can be optimized.
        d. Eg. A left outer join B right outer join C. Only C can be big table table.
        e. Eg. A right outer join B left outer join C. Only B can be big table table.
        f. Eg. A left outer join B left outer join C. Only A can be big table table.
        g. Eg. A right outer join B right outer join C. Both B and C can be big table table.
        2) Put all these generated map join tasks into conditional task and set the mapping between big table's alias with the corresponding map join task.
        3) During the execution time, the resolver will read the input file size. If the input file size of small table is less than a threshold, than run the converted map join task.
        4) Set each map join task with a backup task. The backup task is the originally common join task.
        This mapping relationship is set during execution time.
        5) If the map join task return abnormally, launch the backup task.

        Show
        Liyin Tang added a comment - I just finished converting common join into map join based on the file size. There are 2 flags to control this optimization. 1) set hive.auto.convert.join = true; It means this optimization is enabled. By default right now, this flag is disabled in order not to break any existing test cases. Also I put 25 additional test cases, auto_join0.q - auto_join25.q, which covers this optimization code. 2) Set hive.hashtable.max.memory.usage = 0.9; It means if the memory usage of local task is more than 90% of its heap size, then the local task will abort by itself. The Driver will know the local work fails and it won't submit the MapJoinTask (a Map Only MapRedTask) to Hadoop, but instead, it will submit the originally CommonJoinTask to Hadoop to run. 3) Set hive.smalltable.filesize = 25000000L; It means if the summary of the small table file size is less than 25M, then it will run the map join task. If not, just run the originally common join task. The following is the basic flow how it works. For each common join, create a conditional task. 1) For each join table, generate a mapjoin task by assuming this table is big table. a. The left side of right outer join must be small table. b. The right side of left outer join must be small table. c. No full outer join can be optimized. d. Eg. A left outer join B right outer join C. Only C can be big table table. e. Eg. A right outer join B left outer join C. Only B can be big table table. f. Eg. A left outer join B left outer join C. Only A can be big table table. g. Eg. A right outer join B right outer join C. Both B and C can be big table table. 2) Put all these generated map join tasks into conditional task and set the mapping between big table's alias with the corresponding map join task. 3) During the execution time, the resolver will read the input file size. If the input file size of small table is less than a threshold, than run the converted map join task. 4) Set each map join task with a backup task. The backup task is the originally common join task. This mapping relationship is set during execution time. 5) If the map join task return abnormally, launch the backup task.
        Hide
        Namit Jain added a comment -

        great work Liyin - I will take a look

        Show
        Namit Jain added a comment - great work Liyin - I will take a look
        Hide
        Ted Yu added a comment -

        For this example:
        d. Eg. A left outer join B right outer join C

        why must A be small table ?

        Show
        Ted Yu added a comment - For this example: d. Eg. A left outer join B right outer join C why must A be small table ?
        Hide
        Ted Yu added a comment -

        Do the new test cases cover backup task ?
        If so, do we know the percentage of test cases where backup task is executed ?

        Show
        Ted Yu added a comment - Do the new test cases cover backup task ? If so, do we know the percentage of test cases where backup task is executed ?
        Hide
        Liyin Tang added a comment -

        In the case: A left outer join B right outer join C, A must be small table.

        I have a test case: auto_join25.q to test the backup test. There are several query in this test case.

        The idea is just set the hive.hashtable.max.memory.usage = 0.00001. It means if the local task uses more than 0.00001% of memory, it will abort. Obviously, all local tasks will always fail in this task case. So the back up will run after the local task failed.

        Show
        Liyin Tang added a comment - In the case: A left outer join B right outer join C, A must be small table. I have a test case: auto_join25.q to test the backup test. There are several query in this test case. The idea is just set the hive.hashtable.max.memory.usage = 0.00001. It means if the local task uses more than 0.00001% of memory, it will abort. Obviously, all local tasks will always fail in this task case. So the back up will run after the local task failed.
        Hide
        Namit Jain added a comment -

        1. All the new parameters in HiveConf.java need to be added in hive-default.xml along with comments
        2. Have you run all the tests with hive.auto.convert.join - all the diffs should only be in plan - no results
        change
        3. DriverContext.java: comments for backup* fields
        I think the logic can be simplified with having a backupTask in Task itself
        4. System.out.println in Driver.java
        5. Backup task not generic
        6. ExecMaper: undoes isLogInforEnabled - optimization put by Siying
        7. ExplainTask: Where is backup ask being printed
        8. Why do you need to make so many classes Serializable
        9. I dont see any explain plan in the new tests

        I was thinking about it - I think you can do this all backup task business much easier.
        No need for any special casing - every task has a backup task (currently, only valid
        for joins, but nothing special from a task point of view).

        The change is needed in ConditionalTask execute - If a conditional task consists
        of task 1, 2 and 3 - and 1 is getting executed, we remove 2 and 3 from the children
        of 2 and 3 respectively as if they never existed. This does not work, if there is
        1 followed by 1.1, 2 followed by 2.2, etc.. all the common child X (which is the case
        in your scenario) - we should fix that. Instead of removing only the immediate child's
        parent - we check if the child had any remaining parents, if not, we recurse. This
        way, the conditional task can containa a tree - you dont need grand-child/mapred local
        task (all that special logic) all over.

        I will continue to review more

        Show
        Namit Jain added a comment - 1. All the new parameters in HiveConf.java need to be added in hive-default.xml along with comments 2. Have you run all the tests with hive.auto.convert.join - all the diffs should only be in plan - no results change 3. DriverContext.java: comments for backup* fields I think the logic can be simplified with having a backupTask in Task itself 4. System.out.println in Driver.java 5. Backup task not generic 6. ExecMaper: undoes isLogInforEnabled - optimization put by Siying 7. ExplainTask: Where is backup ask being printed 8. Why do you need to make so many classes Serializable 9. I dont see any explain plan in the new tests I was thinking about it - I think you can do this all backup task business much easier. No need for any special casing - every task has a backup task (currently, only valid for joins, but nothing special from a task point of view). The change is needed in ConditionalTask execute - If a conditional task consists of task 1, 2 and 3 - and 1 is getting executed, we remove 2 and 3 from the children of 2 and 3 respectively as if they never existed. This does not work, if there is 1 followed by 1.1, 2 followed by 2.2, etc.. all the common child X (which is the case in your scenario) - we should fix that. Instead of removing only the immediate child's parent - we check if the child had any remaining parents, if not, we recurse. This way, the conditional task can containa a tree - you dont need grand-child/mapred local task (all that special logic) all over. I will continue to review more
        Hide
        Namit Jain added a comment -

        Can you add more comments for the use of the new TaskWalker ?
        Also, since it is specific to Tasks, should it be in the lib directory, or should
        it be moved to exec ?

        Show
        Namit Jain added a comment - Can you add more comments for the use of the new TaskWalker ? Also, since it is specific to Tasks, should it be in the lib directory, or should it be moved to exec ?
        Hide
        Liyin Tang added a comment -

        Thanks for reviewing.

        1. I will add these parameters in the config xml file.

        2. By default hive.auto.convert.join = false right now, all the existing test cases won't be affected

        3. I am also thinking about putting the backup task into task directly, which is the simplest way to implement this. My only concern is that it will take more than time de/serializing the task.

        4. I will remove this the print statement.
        5. The same as point 3.
        6. I will fix it, some svn synchronization problem.

        7. Right now the back up task is generated during the execution time. That's why it is not easy to work with explain task. But if we put backup task into task directly, we can solve this problem. Also we should set the backup task during the compile time instead of execution time. The only cost is the task serialization time.

        8. Because we need to reuse the code of MapJoinProcessor, which uses join tree and row resolver to generate the new map join operator. So each time when generating a new map join operator, we need a deep copy of join tree and op context. Several classes need to be Serializable.

        9. I generated these test cases output by set the hive.auto.convert.join = false first, then reset the flag as true. So I can compare whether the result is correct or not.
        Since right now, the join result is correct, I can add explain into test case.

        10.I will fix the conditional task to make it more generic.

        Show
        Liyin Tang added a comment - Thanks for reviewing. 1. I will add these parameters in the config xml file. 2. By default hive.auto.convert.join = false right now, all the existing test cases won't be affected 3. I am also thinking about putting the backup task into task directly, which is the simplest way to implement this. My only concern is that it will take more than time de/serializing the task. 4. I will remove this the print statement. 5. The same as point 3. 6. I will fix it, some svn synchronization problem. 7. Right now the back up task is generated during the execution time. That's why it is not easy to work with explain task. But if we put backup task into task directly, we can solve this problem. Also we should set the backup task during the compile time instead of execution time. The only cost is the task serialization time. 8. Because we need to reuse the code of MapJoinProcessor, which uses join tree and row resolver to generate the new map join operator. So each time when generating a new map join operator, we need a deep copy of join tree and op context. Several classes need to be Serializable. 9. I generated these test cases output by set the hive.auto.convert.join = false first, then reset the flag as true. So I can compare whether the result is correct or not. Since right now, the join result is correct, I can add explain into test case. 10.I will fix the conditional task to make it more generic.
        Hide
        Namit Jain added a comment -

        Let us talk about it - I dont think Tasks need to be serialized - we should ad backup tasks at compile time itself

        Show
        Namit Jain added a comment - Let us talk about it - I dont think Tasks need to be serialized - we should ad backup tasks at compile time itself
        Hide
        Liyin Tang added a comment -

        There are 2 kinds of backup. 1) task level 2) branch level. I think the way you mentioned above is the branch level. The conditional task maintains a tree, if one branch fails, then try another branch.
        I think, both of them is fine right now. But the branch level is more complicated to implement, because the back up task may not be a single task but a tree of tasks. The design goal is to replace one branch of task with another branch.

        I think the problem right now is that there 2 tasks involved in MapJoin. Image that, 3 months ago, there is no map join local task. It will be very easy to implement this. Once the mapjoin task fails, we replace with the backup task. It is the task level backup.

        The problem is we split the map join task into 2 tasks. But we can still logically argue that the local task is PART of the map reduce task. Actually, they do come from the same task. That's why if it is the local task, we look ahead one more task.

        In the future, we may have more this kinds of situation, splitting one task into multiple tasks. Then we may need a loop here. Say if this task is split from other tasks, keep looking ahead.

        Any other thoughts.

        Show
        Liyin Tang added a comment - There are 2 kinds of backup. 1) task level 2) branch level. I think the way you mentioned above is the branch level. The conditional task maintains a tree, if one branch fails, then try another branch. I think, both of them is fine right now. But the branch level is more complicated to implement, because the back up task may not be a single task but a tree of tasks. The design goal is to replace one branch of task with another branch. I think the problem right now is that there 2 tasks involved in MapJoin. Image that, 3 months ago, there is no map join local task. It will be very easy to implement this. Once the mapjoin task fails, we replace with the backup task. It is the task level backup. The problem is we split the map join task into 2 tasks. But we can still logically argue that the local task is PART of the map reduce task. Actually, they do come from the same task. That's why if it is the local task, we look ahead one more task. In the future, we may have more this kinds of situation, splitting one task into multiple tasks. Then we may need a loop here. Say if this task is split from other tasks, keep looking ahead. Any other thoughts.
        Hide
        Namit Jain added a comment -

        TaskGraphWalker:

        if(!(nd instanceof Task))

        { throw new SemanticException("Task Graph Walker only walk fro Task Graph"); }

        spelling mistake:

        taskListInConditionalTask = ((ConditionalTask) nd).getListTasks();
        for(Task<? extends Serializable> tsk: taskListInConditionalTask){
        List<Task<? extends Serializable>> childTask = tsk.getChildTasks();
        if(childTask != null)

        { nextTaskSet.addAll(tsk.getChildTasks()); }

        }

        Maybe I am missing something: dont we need to add listTasks of conditional tasks also to the nextTaskSet ?

        This is correct - but can you add more comments ??

        CommonJoinResolver:
        Do we go here even if it is not possible to convert ?
        check the size of smallTableSet with total number of tables

        MapJoinProcessor:

        public MapJoinOperator generateMapJoniOperator(ParseContext pctx, JoinOperator op,

        spelling

        Show
        Namit Jain added a comment - TaskGraphWalker: if(!(nd instanceof Task)) { throw new SemanticException("Task Graph Walker only walk fro Task Graph"); } spelling mistake: taskListInConditionalTask = ((ConditionalTask) nd).getListTasks(); for(Task<? extends Serializable> tsk: taskListInConditionalTask){ List<Task<? extends Serializable>> childTask = tsk.getChildTasks(); if(childTask != null) { nextTaskSet.addAll(tsk.getChildTasks()); } } Maybe I am missing something: dont we need to add listTasks of conditional tasks also to the nextTaskSet ? This is correct - but can you add more comments ?? CommonJoinResolver: Do we go here even if it is not possible to convert ? check the size of smallTableSet with total number of tables MapJoinProcessor: public MapJoinOperator generateMapJoniOperator(ParseContext pctx, JoinOperator op, spelling
        Hide
        Namit Jain added a comment -

        come more minor comments:

        1203 //Qualify the path against the filesystem. The user configured path might co?
        ntain default port which is skipped
        1204 //in the file status. This makes sure that all paths which goes into PathToP?
        artitionInfo are always listed status
        1205 //filepath.
        1206 newPath = fs.makeQualified(newPath);

        ExecDriver: is the above intentional ?

        MapJoinProcessor:
        public static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJoinOp,

        Make it private – add more comments
        Add more comments to the new functions

        // keep record all the input path for this alias
        HashSet<String> pathSet = new HashSet<String>();
        for (Map.Entry<String, ArrayList<String>> entry2 : pathToAliases.entrySet()) {
        String path = entry2.getKey();
        ArrayList<String> list = entry2.getValue();
        if (list.contains(alias)) {
        // add to path set
        if (!pathSet.contains(path))

        { pathSet.add(path); }

        // remove this alias from the alias list
        list.remove(alias);
        }
        }

        Don't you need to remove the entry from pathToAliases if list becomes empty.

        Show
        Namit Jain added a comment - come more minor comments: 1203 //Qualify the path against the filesystem. The user configured path might co? ntain default port which is skipped 1204 //in the file status. This makes sure that all paths which goes into PathToP? artitionInfo are always listed status 1205 //filepath. 1206 newPath = fs.makeQualified(newPath); ExecDriver: is the above intentional ? MapJoinProcessor: public static String genMapJoinLocalWork(MapredWork newWork, MapJoinOperator mapJoinOp, Make it private – add more comments Add more comments to the new functions // keep record all the input path for this alias HashSet<String> pathSet = new HashSet<String>(); for (Map.Entry<String, ArrayList<String>> entry2 : pathToAliases.entrySet()) { String path = entry2.getKey(); ArrayList<String> list = entry2.getValue(); if (list.contains(alias)) { // add to path set if (!pathSet.contains(path)) { pathSet.add(path); } // remove this alias from the alias list list.remove(alias); } } Don't you need to remove the entry from pathToAliases if list becomes empty.
        Hide
        Liyin Tang added a comment -

        Thanks for the comments.
        I have updated the patch according to the review comments.

        Show
        Liyin Tang added a comment - Thanks for the comments. I have updated the patch according to the review comments.
        Hide
        Liyin Tang added a comment -

        This patch formats the output of local task.

        Show
        Liyin Tang added a comment - This patch formats the output of local task.
        Hide
        Namit Jain added a comment -

        hive-default.xml

        477 <property>
        478 <name>hive.mapjoin.hashtable.threshold</name>
        479 <value>100000</value>
        480 <description>the threshold for the mapjoin hashtable</description>
        481 </property>
        482
        483 <property>
        484 <name>hive.mapjoin.hashtable.loadfactor</name>
        485 <value>0.75</value>
        486 <description>the load factor for the mapjoin hashtable</description>
        487 </property>
        488
        489 <property>
        490 <name>hive.mapjoin.smalltable.filesize</name>
        491 <value>25000000</value>
        492 <description>The threshold for the input file size of the small tables; if the file size is smaller than this threshold, it will try to concert the common join into map join</description>
        493 </property>
        494
        495 <property>
        496 <name>hive.mapjoin.localtask.max.memory.usage</name>
        497 <value>0.90</value>
        498 <description>The max memory usage of the local task for map join</description>
        499 </property>
        500

        Add more comments for the 1,2 and 4 properties.
        spelling mistake in the third: concert -> convert

        Uncheckout DriverContext.java

        Why should backup task be obtained from the resolver ?
        It can be created at task creation time itself ?

        Show
        Namit Jain added a comment - hive-default.xml 477 <property> 478 <name>hive.mapjoin.hashtable.threshold</name> 479 <value>100000</value> 480 <description>the threshold for the mapjoin hashtable</description> 481 </property> 482 483 <property> 484 <name>hive.mapjoin.hashtable.loadfactor</name> 485 <value>0.75</value> 486 <description>the load factor for the mapjoin hashtable</description> 487 </property> 488 489 <property> 490 <name>hive.mapjoin.smalltable.filesize</name> 491 <value>25000000</value> 492 <description>The threshold for the input file size of the small tables; if the file size is smaller than this threshold, it will try to concert the common join into map join</description> 493 </property> 494 495 <property> 496 <name>hive.mapjoin.localtask.max.memory.usage</name> 497 <value>0.90</value> 498 <description>The max memory usage of the local task for map join</description> 499 </property> 500 Add more comments for the 1,2 and 4 properties. spelling mistake in the third: concert -> convert Uncheckout DriverContext.java Why should backup task be obtained from the resolver ? It can be created at task creation time itself ?
        Hide
        Liyin Tang added a comment -

        Add more descriptions to the configuration files.
        Revert the DriverContext.

        Show
        Liyin Tang added a comment - Add more descriptions to the configuration files. Revert the DriverContext.
        Hide
        Liyin Tang added a comment -

        Add more detailed description on configuration xml file
        Revert the DriverContext.java, since there should be no change on this file.

        Show
        Liyin Tang added a comment - Add more detailed description on configuration xml file Revert the DriverContext.java, since there should be no change on this file.
        Hide
        Liyin Tang added a comment -

        Remove the getBackupTask interface from all the Conditional Resolver

        Show
        Liyin Tang added a comment - Remove the getBackupTask interface from all the Conditional Resolver
        Hide
        Namit Jain added a comment -

        ConditionalResolverCommonJoin

        // generate file size to alias mapping; but connot set file size as key,
        // using 2 list to keep mapping

        spelling (connot)

        Show
        Namit Jain added a comment - ConditionalResolverCommonJoin // generate file size to alias mapping; but connot set file size as key, // using 2 list to keep mapping spelling (connot)
        Hide
        Namit Jain added a comment -

        ConditionalResolverCommonJoin

        // Iterate the sorted_set to get big/small table file size
        for (int index = 0; index < sortedList.size(); index++) {
        Long key = sortedList.get(index);
        int i = fileSizeList.indexOf(key);
        String alias = aliasList.get;

        if (index != (size - 1))

        { smallTablesFileSizeSum += key.longValue(); }

        else

        { bigTableFileSize += key.longValue(); bigTableFileAlias = alias; }

        }

        The lines:

        int i = fileSizeList.indexOf(key);
        String alias = aliasList.get;

        are only needed in the 'else' block

        Show
        Namit Jain added a comment - ConditionalResolverCommonJoin // Iterate the sorted_set to get big/small table file size for (int index = 0; index < sortedList.size(); index++) { Long key = sortedList.get(index); int i = fileSizeList.indexOf(key); String alias = aliasList.get ; if (index != (size - 1)) { smallTablesFileSizeSum += key.longValue(); } else { bigTableFileSize += key.longValue(); bigTableFileAlias = alias; } } The lines: int i = fileSizeList.indexOf(key); String alias = aliasList.get ; are only needed in the 'else' block
        Hide
        Liyin Tang added a comment -

        In Task.java

        public void replaceWithConditionalTask(ConditionalTask cndTsk, PhysicalContext physicalContext) {
        // take care of parent tasks
        ....
        ...
        // take care of children tasks
        List<Task<? extends Serializable>> oldChildTasks = this.getChildTasks();
        if (oldChildTasks != null) {
        for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
        if (tsk.equals(this))

        { // avoid redundantly add this task again continue; }

        for (Task<? extends Serializable> oldChild : oldChildTasks)

        { tsk.addDependentTask(oldChild); }

        }
        }

        Show
        Liyin Tang added a comment - In Task.java public void replaceWithConditionalTask(ConditionalTask cndTsk, PhysicalContext physicalContext) { // take care of parent tasks .... ... // take care of children tasks List<Task<? extends Serializable>> oldChildTasks = this.getChildTasks(); if (oldChildTasks != null) { for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) { if (tsk.equals(this)) { // avoid redundantly add this task again continue; } for (Task<? extends Serializable> oldChild : oldChildTasks) { tsk.addDependentTask(oldChild); } } }
        Hide
        Liyin Tang added a comment -

        some minor changes in ConditionalResolverCommonJoin.java

        Show
        Liyin Tang added a comment - some minor changes in ConditionalResolverCommonJoin.java
        Hide
        Liyin Tang added a comment -

        After discussing, we think the function: replaceWithConditionalTask is not such general to be put int the Task Class.
        So we move this function back to the CommonJoinResolver Class.

        Show
        Liyin Tang added a comment - After discussing, we think the function: replaceWithConditionalTask is not such general to be put int the Task Class. So we move this function back to the CommonJoinResolver Class.
        Hide
        Liyin Tang added a comment -

        When the local task runs out of memory, do NOT print any thing out and just return from this process.
        Because calling l4j to print will make it worse.

        Sorry for so many minor changes in this afternoon.

        Show
        Liyin Tang added a comment - When the local task runs out of memory, do NOT print any thing out and just return from this process. Because calling l4j to print will make it worse. Sorry for so many minor changes in this afternoon.
        Hide
        Namit Jain added a comment -

        +1 running tests

        Show
        Namit Jain added a comment - +1 running tests
        Hide
        Namit Jain added a comment -

        Committed. Thanks Liyin

        Show
        Namit Jain added a comment - Committed. Thanks Liyin

          People

          • Assignee:
            Liyin Tang
            Reporter:
            Namit Jain
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development