Hive
  1. Hive
  2. HIVE-964

handle skewed keys for a join in a separate job

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.6.0
    • Component/s: Query Processor
    • Labels:
      None
    • Hadoop Flags:
      Reviewed

      Description

      The skewed keys can be written to a temporary table or file, and a followup conditional task can be used to perform the join on those keys.
      As a first step, JDBM can be used for those keys

      1. hive-964-2010-01-15-4.patch
        2.24 MB
        He Yongqiang
      2. hive-964-2010-01-14-3.patch
        2.26 MB
        He Yongqiang
      3. hive-964-2010-01-13-2.patch
        221 kB
        He Yongqiang
      4. hive-964-2010-01-08.patch
        293 kB
        He Yongqiang
      5. hive-964-2009-12-29-4.patch
        639 kB
        He Yongqiang
      6. hive-964-2009-12-28-2.patch
        621 kB
        He Yongqiang
      7. hive-964-2009-12-17.txt
        59 kB
        He Yongqiang

        Issue Links

          Activity

          Gavin made changes -
          Link This issue is depended upon by HIVE-562 [ HIVE-562 ]
          Gavin made changes -
          Link This issue blocks HIVE-562 [ HIVE-562 ]
          Carl Steinbach made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Hide
          Namit Jain added a comment -

          I just did a grep in the negative tests directory.
          There are only 28 negative create table statements and 3 create functions.

          A simpler approach might be change these tests to use the same name - and at the beginning of each test, just drop all of them.
          Kind of like "pre-drop" that John is suggesting. Instead of making it scan the catalog, we can just drop tables: T1, T2, ... Tn and so on,
          assuming all tests follow the same naming convention. The drop can be added in QTestUtil or NegavtiveCliDriver

          Show
          Namit Jain added a comment - I just did a grep in the negative tests directory. There are only 28 negative create table statements and 3 create functions. A simpler approach might be change these tests to use the same name - and at the beginning of each test, just drop all of them. Kind of like "pre-drop" that John is suggesting. Instead of making it scan the catalog, we can just drop tables: T1, T2, ... Tn and so on, assuming all tests follow the same naming convention. The drop can be added in QTestUtil or NegavtiveCliDriver
          Hide
          John Sichi added a comment -

          Regarding the "Output: default@t1" diff:

          I noticed something similar while I was developing negative tests for CREATE VIEW. I think the problem is that if the same object name (like "t1") is used in more than one negative test script, then you'll get different results for DROP TABLE depending on whether the test is run by itself vs as part of a suite. The problem is that negative tests don't get a chance to clean up after themselves, so the execution of the pre-drop in the next test is going to depend on whether a previous test ran leaving behind objects.

          I worked around it by changing my scripts to use unique object names.

          A less brittle approach (which I've used on similar test frameworks in the past) is to automate the "pre-drop" logic to make it generic at the beginning of each test. The test framework scans the catalog and issues a DROP for each existing object (in silent mode). This also means test authors can skip writing both pre-drop and post-drop (regardless of whether it is a positive or negative test), which is nice.

          Show
          John Sichi added a comment - Regarding the "Output: default@t1" diff: I noticed something similar while I was developing negative tests for CREATE VIEW. I think the problem is that if the same object name (like "t1") is used in more than one negative test script, then you'll get different results for DROP TABLE depending on whether the test is run by itself vs as part of a suite. The problem is that negative tests don't get a chance to clean up after themselves, so the execution of the pre-drop in the next test is going to depend on whether a previous test ran leaving behind objects. I worked around it by changing my scripts to use unique object names. A less brittle approach (which I've used on similar test frameworks in the past) is to automate the "pre-drop" logic to make it generic at the beginning of each test. The test framework scans the catalog and issues a DROP for each existing object (in silent mode). This also means test authors can skip writing both pre-drop and post-drop (regardless of whether it is a positive or negative test), which is nice.
          Namit Jain made changes -
          Status Patch Available [ 10002 ] Resolved [ 5 ]
          Hadoop Flags [Reviewed]
          Fix Version/s 0.6.0 [ 12314524 ]
          Resolution Fixed [ 1 ]
          Hide
          Namit Jain added a comment -

          Committed. Thanks Yongqiang

          Show
          Namit Jain added a comment - Committed. Thanks Yongqiang
          Hide
          Namit Jain added a comment -

          ok, will do

          Show
          Namit Jain added a comment - ok, will do
          Hide
          He Yongqiang added a comment -

          Hi Namit, can you resolve that diff manually? I just tried that testcase, but I did not get the diff.
          Also can we increase the interval time in another jira? Right now I often got wrong patch file, so i do not want to risk getting a wrong diff again.

          Show
          He Yongqiang added a comment - Hi Namit, can you resolve that diff manually? I just tried that testcase, but I did not get the diff. Also can we increase the interval time in another jira? Right now I often got wrong patch file, so i do not want to risk getting a wrong diff again.
          Hide
          Namit Jain added a comment -

          [junit] diff -a -I file: -I /tmp/ -I invalidscheme: -I lastUpdateTime -I lastAccessTime -I owner -I transient_lastDdlTime /data/us\
          ers/njain/hive_commit1/hive_commit1/build/ql/test/logs/clientnegative/load_wrong_fileformat_rc_seq.q.out /data/users/njain/hive_commit\
          1/hive_commit1/ql/src/test/results/clientnegative/load_wrong_fileformat_rc_seq.q.out
          [junit] 11d10
          [junit] < POSTHOOK: Output: default@t1

          Got a diff in NegativeCliDriver

          Also, I got a timeout while running TestCliDriver - can you increase the timeout interval time ?

          Show
          Namit Jain added a comment - [junit] diff -a -I file: -I /tmp/ -I invalidscheme: -I lastUpdateTime -I lastAccessTime -I owner -I transient_lastDdlTime /data/us\ ers/njain/hive_commit1/hive_commit1/build/ql/test/logs/clientnegative/load_wrong_fileformat_rc_seq.q.out /data/users/njain/hive_commit\ 1/hive_commit1/ql/src/test/results/clientnegative/load_wrong_fileformat_rc_seq.q.out [junit] 11d10 [junit] < POSTHOOK: Output: default@t1 Got a diff in NegativeCliDriver Also, I got a timeout while running TestCliDriver - can you increase the timeout interval time ?
          Hide
          Namit Jain added a comment -

          I am running tests right now

          Show
          Namit Jain added a comment - I am running tests right now
          Hide
          He Yongqiang added a comment -

          Also i just filed new followup jira hive-1058.
          Namit and Ning, pls feel free to file new jiras.

          Show
          He Yongqiang added a comment - Also i just filed new followup jira hive-1058. Namit and Ning, pls feel free to file new jiras.
          Hide
          Ning Zhang added a comment -

          the lastest patch (hive-964-... 15-4.patch) looks fine to me. Namit this is the patch Yongqiang is ready to merge.

          Show
          Ning Zhang added a comment - the lastest patch (hive-964-... 15-4.patch) looks fine to me. Namit this is the patch Yongqiang is ready to merge.
          Hide
          Namit Jain added a comment -

          Can you file follow up patches - I will try to merge this

          Show
          Namit Jain added a comment - Can you file follow up patches - I will try to merge this
          He Yongqiang made changes -
          Status Open [ 1 ] Patch Available [ 10002 ]
          He Yongqiang made changes -
          Attachment hive-964-2010-01-15-4.patch [ 12430463 ]
          Hide
          He Yongqiang added a comment -

          These are finished and passed all tests. Will upload a new patch soon. Just can't find my right patch now ...

          Show
          He Yongqiang added a comment - These are finished and passed all tests. Will upload a new patch soon. Just can't find my right patch now ...
          Hide
          Namit Jain added a comment -

          @yongqiang/Ning.

          Do you think we can do these things in follow-up jiras - conditional task output in explain and union struct object inspector ?

          If we want to merge John's patch also by Tuesday, we should get this in.

          Show
          Namit Jain added a comment - @yongqiang/Ning. Do you think we can do these things in follow-up jiras - conditional task output in explain and union struct object inspector ? If we want to merge John's patch also by Tuesday, we should get this in.
          Hide
          Ning Zhang added a comment -

          the current patch does not contain the UnionStructObjectInspector change. Yongqiang is redoing that changes now.

          Show
          Ning Zhang added a comment - the current patch does not contain the UnionStructObjectInspector change. Yongqiang is redoing that changes now.
          Hide
          Namit Jain added a comment -

          Dont you think it is useless to output the info. twice ?

          Show
          Namit Jain added a comment - Dont you think it is useless to output the info. twice ?
          Hide
          He Yongqiang added a comment -

          The output of child tasks of condition tasks is emitted twice (they should not be emitted while
          displaying conditional task).

          This is controlled by conditional work, do u think it is really good to remove them from conditional work?

          Show
          He Yongqiang added a comment - The output of child tasks of condition tasks is emitted twice (they should not be emitted while displaying conditional task). This is controlled by conditional work, do u think it is really good to remove them from conditional work?
          Hide
          Namit Jain added a comment -

          A subtask of a conditional task cannot be a root stage.
          Change "is constitued of" by "consists of"
          The output of child tasks of condition tasks is emitted twice (they should not be emitted while
          displaying conditional task).

          Also, can you file follow-up jiras ?

          Otherwise, it is OK for me.

          Ning, can you also take a look again and approve it.

          Show
          Namit Jain added a comment - A subtask of a conditional task cannot be a root stage. Change "is constitued of" by "consists of" The output of child tasks of condition tasks is emitted twice (they should not be emitted while displaying conditional task). Also, can you file follow-up jiras ? Otherwise, it is OK for me. Ning, can you also take a look again and approve it.
          He Yongqiang made changes -
          Attachment hive-964-2010-01-14-3.patch [ 12430375 ]
          He Yongqiang made changes -
          Attachment hive-964-2010-01-14-3.patch [ 12430366 ]
          He Yongqiang made changes -
          Attachment hive-964-2010-01-14-3.patch [ 12430366 ]
          Hide
          He Yongqiang added a comment -

          Incorporated comments, and attached the patch.
          Thanks for the detailed comments Namit and Ning.

          Show
          He Yongqiang added a comment - Incorporated comments, and attached the patch. Thanks for the detailed comments Namit and Ning.
          Hide
          Ning Zhang added a comment -

          Some more comments:

          1) RowContainer.java:134 and 207 can you define a enum in HiveConf and use that instead of the string here?
          2) RowConainer.java:147 the if condition should always be true due to the assertion in line 144. So if should be removed. Also in setSerDe dummyRow doesn't need to be set here since it will be passed by the caller (e.g., CommonJoinOperator) who construct the dummy row and passed by add(). Please take a look at add() line 165.
          3) please move variable declarations in 171-177 to the beginning of the class where most variables are declared and add a brief comment on each of them.
          4) the firstCalled boolean should be cleared at add() otherwise the following situation may give wrong results: add, first, add, next, next.
          5) in first(), the closeWriter(), closeReader() are called for each first(), this may cause bad performance when the RowContainer is iterated many times and there is no
          6) InputFormat in line 204. It could be very expensive if the RowContainer is iterated many times
          7) Can you rename the variable originalReadBlock to firstBlock, which is easier to understand..
          8) in nextBlock Writable val is a new instance of serde for every new block, can we reuse the serde?
          9) key is inserted for each row as the first element before spillBlock and after nextBlock. This is too expensive given the row is an ArrayList. Zheng suggested to use UnionStructObjectInspector to handle key and value separately.

          Show
          Ning Zhang added a comment - Some more comments: 1) RowContainer.java:134 and 207 can you define a enum in HiveConf and use that instead of the string here? 2) RowConainer.java:147 the if condition should always be true due to the assertion in line 144. So if should be removed. Also in setSerDe dummyRow doesn't need to be set here since it will be passed by the caller (e.g., CommonJoinOperator) who construct the dummy row and passed by add(). Please take a look at add() line 165. 3) please move variable declarations in 171-177 to the beginning of the class where most variables are declared and add a brief comment on each of them. 4) the firstCalled boolean should be cleared at add() otherwise the following situation may give wrong results: add, first, add, next, next. 5) in first(), the closeWriter(), closeReader() are called for each first(), this may cause bad performance when the RowContainer is iterated many times and there is no 6) InputFormat in line 204. It could be very expensive if the RowContainer is iterated many times 7) Can you rename the variable originalReadBlock to firstBlock, which is easier to understand.. 8) in nextBlock Writable val is a new instance of serde for every new block, can we reuse the serde? 9) key is inserted for each row as the first element before spillBlock and after nextBlock. This is too expensive given the row is an ArrayList. Zheng suggested to use UnionStructObjectInspector to handle key and value separately.
          Hide
          Namit Jain added a comment -

          1. ConditionalTask.java: 80 if(DriverContext.isLaunchable(child))
          Shouldnt this be a assert instead ?

          2. SkewJoinResolver: shouldnt it check for HIVESKEWJOINKEY and get out if not set.

          3. ExplainPlan should also show subtasks of conditional tasks at the top stage

          4. Seems like after the skew join conditional task, the dependency between the original join and the old children will
          still be kept - it can be removed.

          5. Last alias/tag for join does not need a conditional task - the last is the last one in the order.

          6. Instead of serializing/deserializing mapredWork, it might be a good idea to add a clone to mapredWork - it can be done
          in a followup patch also.

          7. GenMRSkewJoinProcessor.java:253 wont localPlan always be null

          8. Can there be a fetchWork in the conditional task ?

          9. processSkewJoin: do you think it would be cleaner if you break it up into multiple functions ?

          Show
          Namit Jain added a comment - 1. ConditionalTask.java: 80 if(DriverContext.isLaunchable(child)) Shouldnt this be a assert instead ? 2. SkewJoinResolver: shouldnt it check for HIVESKEWJOINKEY and get out if not set. 3. ExplainPlan should also show subtasks of conditional tasks at the top stage 4. Seems like after the skew join conditional task, the dependency between the original join and the old children will still be kept - it can be removed. 5. Last alias/tag for join does not need a conditional task - the last is the last one in the order. 6. Instead of serializing/deserializing mapredWork, it might be a good idea to add a clone to mapredWork - it can be done in a followup patch also. 7. GenMRSkewJoinProcessor.java:253 wont localPlan always be null 8. Can there be a fetchWork in the conditional task ? 9. processSkewJoin: do you think it would be cleaner if you break it up into multiple functions ?
          He Yongqiang made changes -
          Attachment hive-964-2010-01-13-2.patch [ 12430208 ]
          Hide
          He Yongqiang added a comment -

          hive-964-2010-01-13-2.patch is not the final patch, and just for early review. Still got one testcase failure.

          Show
          He Yongqiang added a comment - hive-964-2010-01-13-2.patch is not the final patch, and just for early review. Still got one testcase failure.
          Hide
          He Yongqiang added a comment -

          if (alias == numAliases - 1 && !(this.handleSkewJoin &&
          this.skewJoinKeyContext.skewKeyInCurrentGroup)) {
          JoinOperator.java:

          Do you need the change ? Why do we need to handle skew for the last key ?

          this change is not needed right now. It is a mistake by last patch. In last patch, joinOp directly write skew keys into hdfs and there is no copy in storage once data is written into hdfs. since right we first use local disk to store data and upload to hdfs at last, we can remove this change.

          will work on other comments. Thanks for the detailed comments!

          Show
          He Yongqiang added a comment - if (alias == numAliases - 1 && !(this.handleSkewJoin && this.skewJoinKeyContext.skewKeyInCurrentGroup)) { JoinOperator.java: Do you need the change ? Why do we need to handle skew for the last key ? this change is not needed right now. It is a mistake by last patch. In last patch, joinOp directly write skew keys into hdfs and there is no copy in storage once data is written into hdfs. since right we first use local disk to store data and upload to hdfs at last, we can remove this change. will work on other comments. Thanks for the detailed comments!
          Hide
          Namit Jain added a comment -

          Instead of making Driver aware of conditional task - can we change the code structure ?
          Pass a context to execute()/initialize() if needed.

          The current tasks would ignore the context. But the conditional task would add the resulting tasks in the queue which will be
          passed in the context.

          Show
          Namit Jain added a comment - Instead of making Driver aware of conditional task - can we change the code structure ? Pass a context to execute()/initialize() if needed. The current tasks would ignore the context. But the conditional task would add the resulting tasks in the queue which will be passed in the context.
          Hide
          Namit Jain added a comment -

          if (alias == numAliases - 1 && !(this.handleSkewJoin &&
          this.skewJoinKeyContext.skewKeyInCurrentGroup)) {

          JoinOperator.java:

          Do you need the change ? Why do we need to handle skew for the last key ?

          Can we move the changes for skew join in a new file ?

          Show
          Namit Jain added a comment - if (alias == numAliases - 1 && !(this.handleSkewJoin && this.skewJoinKeyContext.skewKeyInCurrentGroup)) { JoinOperator.java: Do you need the change ? Why do we need to handle skew for the last key ? Can we move the changes for skew join in a new file ?
          Hide
          Namit Jain added a comment -

          Did not take a look in great detail, but some high level comments:

          1. Changes in ExecDriver are not needed
          2. Skew Join should be a optimization step - I remember initially we had thought about it and said it might be easy to do it at the end,
          but it makes more sense to plug it in the optimization phase. It can be the last optimization step, and we can assume that map join
          conversions etc. have been done.
          3. Condtitional Task: needs some rework. Since execute is not getting called recursively, same thing should happen for initialize.
          It would be great if we can fold it in execute though - not sure how.
          4. The numbers of jobs etc. should be correct - conditional task is not a single job, but 'n'.

          Show
          Namit Jain added a comment - Did not take a look in great detail, but some high level comments: 1. Changes in ExecDriver are not needed 2. Skew Join should be a optimization step - I remember initially we had thought about it and said it might be easy to do it at the end, but it makes more sense to plug it in the optimization phase. It can be the last optimization step, and we can assume that map join conversions etc. have been done. 3. Condtitional Task: needs some rework. Since execute is not getting called recursively, same thing should happen for initialize. It would be great if we can fold it in execute though - not sure how. 4. The numbers of jobs etc. should be correct - conditional task is not a single job, but 'n'.
          Hide
          He Yongqiang added a comment -

          hive-964-2010-01-08.patch is just for review, will update patch to trunk code after 0.5.

          Show
          He Yongqiang added a comment - hive-964-2010-01-08.patch is just for review, will update patch to trunk code after 0.5.
          He Yongqiang made changes -
          Attachment hive-964-2010-01-08.patch [ 12429779 ]
          Hide
          He Yongqiang added a comment -

          CHANGES:
          1) let row container use hadoop fileformat
          2) let row container directly upload file to dfs
          3) added logic to handle reducer failure (we should not see results of a failure reducer) .
          4) write one file per skew key and combinefileinputformat.

          Show
          He Yongqiang added a comment - CHANGES: 1) let row container use hadoop fileformat 2) let row container directly upload file to dfs 3) added logic to handle reducer failure (we should not see results of a failure reducer) . 4) write one file per skew key and combinefileinputformat.
          Hide
          He Yongqiang added a comment -

          Had an offline discussion with Namit and Ning days ago. some notes:
          1)
          we can let it not support outer join right now because we need run time evaluation (not just flush data out) . Runtime join evaluate is used to see if a partial join is empty and what will be the final results. for example: "SELECT * FROM T1 src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key RIGHT OUTER JOIN T2 src3 ON src2.key = src3.key;". Results of "src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key" is empty. So it is actually EMPTY right outer join src3.

          2)
          Right now after hive-963 in, once a key appears more than "hive.join.cache.size", data is actually written to local disk by row container.
          We need to let row container to use hadoop fileformat in order to write data to hdfs for another mr job more easily.

          Show
          He Yongqiang added a comment - Had an offline discussion with Namit and Ning days ago. some notes: 1) we can let it not support outer join right now because we need run time evaluation (not just flush data out) . Runtime join evaluate is used to see if a partial join is empty and what will be the final results. for example: "SELECT * FROM T1 src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key RIGHT OUTER JOIN T2 src3 ON src2.key = src3.key;". Results of "src1 LEFT OUTER JOIN T2 src2 ON src1.key+1 = src2.key" is empty. So it is actually EMPTY right outer join src3. 2) Right now after hive-963 in, once a key appears more than "hive.join.cache.size", data is actually written to local disk by row container. We need to let row container to use hadoop fileformat in order to write data to hdfs for another mr job more easily.
          Hide
          Ning Zhang added a comment -

          Some comments so far:

          1) GenMRSkewJoinProcessor.skewJoinEnabled() checks whether the join is any form of outer join or not. Talked with Namit and this check seems unnecessary. The reason map join doesn't work with outer join is that the small table contains all keys (not only those that contains a match). In this case, we know exactly the keys will match if they are non-empty. So we can handle Outer Joins as well.

          2) can you add some unit tests for outer joins if the above is changed?

          Show
          Ning Zhang added a comment - Some comments so far: 1) GenMRSkewJoinProcessor.skewJoinEnabled() checks whether the join is any form of outer join or not. Talked with Namit and this check seems unnecessary. The reason map join doesn't work with outer join is that the small table contains all keys (not only those that contains a match). In this case, we know exactly the keys will match if they are non-empty. So we can handle Outer Joins as well. 2) can you add some unit tests for outer joins if the above is changed?
          He Yongqiang made changes -
          Attachment hive-964-2009-12-29-4.patch [ 12429120 ]
          Hide
          He Yongqiang added a comment -

          Attache a new patch.
          Changes include:
          1) update patch against trunk code.
          2) According to an offline discussion with Namit, Ning, and Ashish. This patch uses the original groupKey in reducer as the dummy join key for follow-up map joins. The previous patch just uses java's UUID and a random number generator to generate the dummy join keys.

          HIVE-963 introduced a RowContainer to handle skew join keys, it will serialize the value parts into a local file in case of OOM. Right now this patch just gets value object from RowContainer and serialize it to HDFS. The bad thing is that, if a row is already serialized into local file, it will need to deserialize to object and then reserialize to HDFS. It will need to serialize this object twice and also a deserialize.
          It will be better if we can directly read RowContainer's local file and write to HDFS. This will need to let the RowContainer to serialize the key part together with the join values. (Right now RowContainer will only serialize join values.) It will be better if we can enable this only when skew join is enabled. I suggest to do this in a follow up jira because the changes for handling skew join is already very complicated (Some comments to make the code more easy to understand: 1) 'tag' is get differently in mapper side and reducer side, thus they are different in MapJoinOp and JoinOp 2) join's tag order is reordered in JoinReorder, but the operator tree is not changed. 3) map join actually does not use tag order array, but the tag order array is used in MapJoinOp's parent CommonJoinOp.).

          Show
          He Yongqiang added a comment - Attache a new patch. Changes include: 1) update patch against trunk code. 2) According to an offline discussion with Namit, Ning, and Ashish. This patch uses the original groupKey in reducer as the dummy join key for follow-up map joins. The previous patch just uses java's UUID and a random number generator to generate the dummy join keys. HIVE-963 introduced a RowContainer to handle skew join keys, it will serialize the value parts into a local file in case of OOM. Right now this patch just gets value object from RowContainer and serialize it to HDFS. The bad thing is that, if a row is already serialized into local file, it will need to deserialize to object and then reserialize to HDFS. It will need to serialize this object twice and also a deserialize. It will be better if we can directly read RowContainer's local file and write to HDFS. This will need to let the RowContainer to serialize the key part together with the join values. (Right now RowContainer will only serialize join values.) It will be better if we can enable this only when skew join is enabled. I suggest to do this in a follow up jira because the changes for handling skew join is already very complicated (Some comments to make the code more easy to understand: 1) 'tag' is get differently in mapper side and reducer side, thus they are different in MapJoinOp and JoinOp 2) join's tag order is reordered in JoinReorder, but the operator tree is not changed. 3) map join actually does not use tag order array, but the tag order array is used in MapJoinOp's parent CommonJoinOp.).
          He Yongqiang made changes -
          Attachment hive-964-2009-12-28-2.patch [ 12429036 ]
          He Yongqiang made changes -
          Attachment hive-964-2009-12-17.txt [ 12428393 ]
          Hide
          He Yongqiang added a comment -

          first version for comments.

          Show
          He Yongqiang added a comment - first version for comments.
          Hide
          Namit Jain added a comment -

          Sounds good - btw, after this discussion, I don't think using JDBM is feasible.
          All the directories listed above are hdfs directories.

          Show
          Namit Jain added a comment - Sounds good - btw, after this discussion, I don't think using JDBM is feasible. All the directories listed above are hdfs directories.
          Hide
          He Yongqiang added a comment -

          Here is the idea, according to offline discussions with Namit and Ning.

          1. Number of mr jobs to handle skew keys is the number of table minus 1 (we can stream the last table, so big keys in the last table will not be a problem).
          2. At runtime in Join, we output big keys in one table into one corresponding directories, and all same keys in other tables into different dirs(one for each table).
          The directories will look like:
          dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys which is big in T1),dir-T3-keys(containing keys which is big in T1), ...
          dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing big keys in T2),dir-T3-keys(containing keys which is big in T2), ...
          dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ...
          .....
          3. For each table, we launch one mapjoin job, taking the directory containing big keys in this table and corresponding dirs in other tables as input. (Actally one job for one row in the above.)

          This strategy can help to make the plan fix at compile time.

          Show
          He Yongqiang added a comment - Here is the idea, according to offline discussions with Namit and Ning. 1. Number of mr jobs to handle skew keys is the number of table minus 1 (we can stream the last table, so big keys in the last table will not be a problem). 2. At runtime in Join, we output big keys in one table into one corresponding directories, and all same keys in other tables into different dirs(one for each table). The directories will look like: dir-T1-bigkeys(containing big keys in T1), dir-T2-keys(containing keys which is big in T1),dir-T3-keys(containing keys which is big in T1), ... dir-T1-keys(containing keys which is big in T2), dir-T2-bigkeys(containing big keys in T2),dir-T3-keys(containing keys which is big in T2), ... dir-T1-keys(containing keys which is big in T3), dir-T2-keys(containing big keys in T3),dir-T3-bigkeys(containing keys which is big in T3), ... ..... 3. For each table, we launch one mapjoin job, taking the directory containing big keys in this table and corresponding dirs in other tables as input. (Actally one job for one row in the above.) This strategy can help to make the plan fix at compile time.
          He Yongqiang made changes -
          Assignee He Yongqiang [ he yongqiang ]
          Namit Jain made changes -
          Field Original Value New Value
          Link This issue blocks HIVE-562 [ HIVE-562 ]
          Namit Jain created issue -

            People

            • Assignee:
              He Yongqiang
              Reporter:
              Namit Jain
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development