Hive
  1. Hive
  2. HIVE-1641

add map joined table to distributed cache

    Details

    • Type: Improvement Improvement
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 0.7.0
    • Fix Version/s: 0.7.0
    • Component/s: Query Processor
    • Labels:
      None
    • Release Note:
      Split the MapJoin into 2 stage. In stage 1, generate the JDBM file for each small table. In stage 2, load the JDBM file and do the Join operation in memory

      Description

      Currently, the mappers directly read the map-joined table from HDFS, which makes it difficult to scale.
      We end up getting lots of timeouts once the number of mappers are beyond a few thousand, due to
      concurrent mappers.

      It would be good idea to put the mapped file into distributed cache and read from there instead.

      1. Hive-1641(6).patch
        3 kB
        Liyin Tang
      2. Hive-1641(5).patch
        856 kB
        Liyin Tang
      3. Hive-1641(4).patch
        854 kB
        Liyin Tang
      4. Hive-1641(3).txt
        787 kB
        Liyin Tang
      5. Hive-1641.patch
        194 kB
        Liyin Tang

        Issue Links

          Activity

          Carl Steinbach made changes -
          Status Resolved [ 5 ] Closed [ 6 ]
          Liyin Tang made changes -
          Attachment Hive-1641(6).patch [ 12458217 ]
          Hide
          Liyin Tang added a comment -

          new diff about removing some print and log statements

          Show
          Liyin Tang added a comment - new diff about removing some print and log statements
          He Yongqiang made changes -
          Status Open [ 1 ] Resolved [ 5 ]
          Resolution Fixed [ 1 ]
          Hide
          He Yongqiang added a comment -

          I just committed! Thanks Liyin!

          Show
          He Yongqiang added a comment - I just committed! Thanks Liyin!
          Hide
          Liyin Tang added a comment -

          The patch without jdbm is also ready.
          shall I submit that patch?

          Show
          Liyin Tang added a comment - The patch without jdbm is also ready. shall I submit that patch?
          Hide
          Namit Jain added a comment -

          Let us try to get it in, there are some other big patches waiting and it will be difficult for you to maintain this

          Show
          Namit Jain added a comment - Let us try to get it in, there are some other big patches waiting and it will be difficult for you to maintain this
          Hide
          He Yongqiang added a comment -

          Liyin, i saw some diff from your recent patch? Can you refresh again, and i will test and commit it today.

          And let's optimize the jdbm in the next jira.

          Show
          He Yongqiang added a comment - Liyin, i saw some diff from your recent patch? Can you refresh again, and i will test and commit it today. And let's optimize the jdbm in the next jira.
          Hide
          He Yongqiang added a comment -

          Some tests are failing because of plan change.

          Can you refresh the diff?

          And some more minor comments, you can fix them in the following up jiras or in your next patch (some of them are just few lines of change).
          1.
          NOTSKIPBIGTABLE is defined in both AbstractMapJoinOperator and CommonJoinOperator. And let's not use 'static'.

          2.
          In MapJoinObjectKey, metadataTag is always -1, and we serialize and deserialize it for each key. We can avoid it by simply assume that metadataTag is -1.

          3.
          In JDBMSinkOperator,

          if (hashTable.cacheSize() > 0)

          { o.setObj(res); needNewKey = false; }

          has no effect.

          Even hashTable.cacheSize() > 0, and then needNewKey = true

          In the following code,
          if (needNewKey)

          { ... hashTable.put(keyObj, valueObj); }

          the keyObj and valueObj is already in hashTable, so the put also has no effect except put the value to the head of MRUList. But at the put time, it is already in the head because of the get()

          So ideally,

          we should put most code into

          if (o == null) {
          ....

          if (metadataValueTag[tag] == -1)

          { ..... }

          if (needNewKey)

          { //this is always true here }

          } else

          { res = o.getObj(); res.add(value); }

          These maybe beneficial to the client performance, and that will be good since now we are now putting all the process work of small tables at the client.

          4.
          In JDBMSinkOperator's close(), put hashTable.close(); before uploading jdbm file. That way, JDBM itself may want to do some cleanup work in the close before uploading jdbm file.

          5.
          In JDBMSinkOperator, remove getPersistentFilePath(). there is no referenced to it.

          6.
          In MapjoinOperator's loadJDBM, remove line "int alias;"
          In loadJDBM(), remove code:
          "
          for(int i = 0;i<localFiles.length; i++)

          { Path path = localFiles[i]; }

          "
          7.
          Instead of resolving the file name mapping at runtime. should do it at compile time. Need to open a follow up jira for this.

          8.
          In MapredLocalTask, remove line:
          "
          private MapOperator mo;
          private File jdbmFile;
          "
          Maybe we should print some progress information in startForward(). That way, client will not think it is not responsible.

          AWESOME work! Can you open the follow up jiras for the offline review comments?

          Show
          He Yongqiang added a comment - Some tests are failing because of plan change. Can you refresh the diff? And some more minor comments, you can fix them in the following up jiras or in your next patch (some of them are just few lines of change). 1. NOTSKIPBIGTABLE is defined in both AbstractMapJoinOperator and CommonJoinOperator. And let's not use 'static'. 2. In MapJoinObjectKey, metadataTag is always -1, and we serialize and deserialize it for each key. We can avoid it by simply assume that metadataTag is -1. 3. In JDBMSinkOperator, if (hashTable.cacheSize() > 0) { o.setObj(res); needNewKey = false; } has no effect. Even hashTable.cacheSize() > 0, and then needNewKey = true In the following code, if (needNewKey) { ... hashTable.put(keyObj, valueObj); } the keyObj and valueObj is already in hashTable, so the put also has no effect except put the value to the head of MRUList. But at the put time, it is already in the head because of the get() So ideally, we should put most code into if (o == null) { .... if (metadataValueTag [tag] == -1) { ..... } if (needNewKey) { //this is always true here } } else { res = o.getObj(); res.add(value); } These maybe beneficial to the client performance, and that will be good since now we are now putting all the process work of small tables at the client. 4. In JDBMSinkOperator's close(), put hashTable.close(); before uploading jdbm file. That way, JDBM itself may want to do some cleanup work in the close before uploading jdbm file. 5. In JDBMSinkOperator, remove getPersistentFilePath(). there is no referenced to it. 6. In MapjoinOperator's loadJDBM, remove line "int alias;" In loadJDBM(), remove code: " for(int i = 0;i<localFiles.length; i++) { Path path = localFiles[i]; } " 7. Instead of resolving the file name mapping at runtime. should do it at compile time. Need to open a follow up jira for this. 8. In MapredLocalTask, remove line: " private MapOperator mo; private File jdbmFile; " Maybe we should print some progress information in startForward(). That way, client will not think it is not responsible. AWESOME work! Can you open the follow up jiras for the offline review comments?
          Hide
          He Yongqiang added a comment -

          Had several reviews on the patch offline with Namit, Joy and Liying.
          Will go through the diff again, and commit it.

          Show
          He Yongqiang added a comment - Had several reviews on the patch offline with Namit, Joy and Liying. Will go through the diff again, and commit it.
          Liyin Tang made changes -
          Attachment Hive-1641(5).patch [ 12457591 ]
          Liyin Tang made changes -
          Attachment Hive-1641(4).patch [ 12457580 ]
          Hide
          Namit Jain added a comment -
          Show
          Namit Jain added a comment - Diff at https://review.cloudera.org/r/1037/ for review
          Liyin Tang made changes -
          Link This issue incorporates HIVE-1723 [ HIVE-1723 ]
          Liyin Tang made changes -
          Link This issue incorporates HIVE-1722 [ HIVE-1722 ]
          Liyin Tang made changes -
          Attachment Hive-1641(3).txt [ 12457460 ]
          Namit Jain made changes -
          Status Patch Available [ 10002 ] Open [ 1 ]
          Hide
          Namit Jain added a comment -

          As discussed offline, need to move the task from local to a mapreduce task

          Show
          Namit Jain added a comment - As discussed offline, need to move the task from local to a mapreduce task
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456644 ]
          Hide
          Liyin Tang added a comment -

          I have submitted a new patch on jira.
          This new patch includes adding jdbm files to distributed cache and load it back from the cached file.

          This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q).
          All the testing results match with the expected results.

          Show
          Liyin Tang added a comment - I have submitted a new patch on jira. This new patch includes adding jdbm files to distributed cache and load it back from the cached file. This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q). All the testing results match with the expected results.
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456643 ]
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456642 ]
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456643 ]
          Hide
          Liyin Tang added a comment -

          This new patch includes adding jdbm files to distributed cache and load it back from the cached file.

          This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q).
          All the testing results match with the expected results.

          Show
          Liyin Tang added a comment - This new patch includes adding jdbm files to distributed cache and load it back from the cached file. This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q). All the testing results match with the expected results.
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456642 ]
          Hide
          Liyin Tang added a comment -

          This new patch includes adding jdbm files to distributed cache and load it back from the cached file.

          This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q).
          All the testing results match with the expected results.

          Show
          Liyin Tang added a comment - This new patch includes adding jdbm files to distributed cache and load it back from the cached file. This patch has been tested in the Test cluster for all the map join test case ( join25.q – join39.q). All the testing results match with the expected results.
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456462 ]
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456565 ]
          Hide
          He Yongqiang added a comment -

          There are 2 patches with the same name. Can you delete the older one? And when uploading a patch, pls rename the patch to hive-jiranumber.patchnumberordate.patch.

          Show
          He Yongqiang added a comment - There are 2 patches with the same name. Can you delete the older one? And when uploading a patch, pls rename the patch to hive-jiranumber.patchnumberordate.patch.
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456565 ]
          Hide
          Liyin Tang added a comment -

          New Patch for distributed cache

          Show
          Liyin Tang added a comment - New Patch for distributed cache
          Liyin Tang made changes -
          Attachment Hive-1641.patch [ 12456462 ]
          Liyin Tang made changes -
          Status In Progress [ 3 ] Patch Available [ 10002 ]
          Release Note Split the MapJoin into 2 stage. In stage 1, generate the JDBM file for each small table. In stage 2, load the JDBM file and do the Join operation in memory
          Affects Version/s 0.7.0 [ 12315150 ]
          Hide
          Liyin Tang added a comment -

          The previously assumption is not always true. There might be multiple map join operations in one local work.

          No matter how many map join operators in one Map Red Task, for each map join operator, there will be one parent operator from big table branch and other operators from small table branches.
          For big table branch, just leave it alone.

          For small table branch, create a new JDBMSinkOperator to replace the current MapJoin Operator. Now the local work has no common operators shared with the MapredWork.
          And create a JDBMDummyOperator to replace original parent operator for the MapJoinOperator.
          This JDBMDummyOperator will help MapJoinOperator generate correctly input object inspector during the run time.

          In the execution time, the LocalTask will process all the local work and generate the JDBM file for each small tables.
          When the MapRedTask starts to process the first row for MapJoinOperator, it will load the JDBM file to generate the in-memory hash table.

          If in the local mode, the JDBM files will be just stored in local directory. If not, the jdbm files will be added into Distributed Cache.

          This patch is just tested on Local Mode. I will submit another patch after testing against the clusters.

          Show
          Liyin Tang added a comment - The previously assumption is not always true. There might be multiple map join operations in one local work. No matter how many map join operators in one Map Red Task, for each map join operator, there will be one parent operator from big table branch and other operators from small table branches. For big table branch, just leave it alone. For small table branch, create a new JDBMSinkOperator to replace the current MapJoin Operator. Now the local work has no common operators shared with the MapredWork. And create a JDBMDummyOperator to replace original parent operator for the MapJoinOperator. This JDBMDummyOperator will help MapJoinOperator generate correctly input object inspector during the run time. In the execution time, the LocalTask will process all the local work and generate the JDBM file for each small tables. When the MapRedTask starts to process the first row for MapJoinOperator, it will load the JDBM file to generate the in-memory hash table. If in the local mode, the JDBM files will be just stored in local directory. If not, the jdbm files will be added into Distributed Cache. This patch is just tested on Local Mode. I will submit another patch after testing against the clusters.
          Hide
          Liyin Tang added a comment -

          Right now, the local work is only for processing small tables for map join operation. Also one MapredTask can at most have one map join operation. Because if one map join followed by anther map join, they will be split into 2 tasks. So one MapredTask can at most one local work to do.

          One feasible solution is to create a new type of task, named MapredLocalTask, which is to do some MapredLocalWork (local work). If one MapredTask has a local work to do, then create a new MapredLocal Task for this local work, let the current MapredTask depends on this new generated Task, and let this new generated task depends on the parent tasks of the current task.

          In this new MapredLocalTask, it does the local work only once and generate the mapped file(JDBM file). Next step is to put the new generated mapped file into distributed cache. All the mappers will
          read this file from the distributed cache and construct the in memory hash table based on this file.

          Any comments are so welcome

          Show
          Liyin Tang added a comment - Right now, the local work is only for processing small tables for map join operation. Also one MapredTask can at most have one map join operation. Because if one map join followed by anther map join, they will be split into 2 tasks. So one MapredTask can at most one local work to do. One feasible solution is to create a new type of task, named MapredLocalTask, which is to do some MapredLocalWork (local work). If one MapredTask has a local work to do, then create a new MapredLocal Task for this local work, let the current MapredTask depends on this new generated Task, and let this new generated task depends on the parent tasks of the current task. In this new MapredLocalTask, it does the local work only once and generate the mapped file(JDBM file). Next step is to put the new generated mapped file into distributed cache. All the mappers will read this file from the distributed cache and construct the in memory hash table based on this file. Any comments are so welcome
          Liyin Tang made changes -
          Status Open [ 1 ] In Progress [ 3 ]
          Namit Jain made changes -
          Field Original Value New Value
          Assignee Liyin Tang [ liyin ]
          Namit Jain created issue -

            People

            • Assignee:
              Liyin Tang
              Reporter:
              Namit Jain
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development