Uploaded image for project: 'Phoenix'
  1. Phoenix
  2. PHOENIX-2405

Improve performance and stability of server side sort for ORDER BY

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Labels:

      Description

      We currently use memory mapped files to buffer data as it's being sorted in an ORDER BY (see MappedByteBufferQueue). The following types of exceptions have been seen to occur:

      Caused by: java.lang.OutOfMemoryError: Map failed
              at sun.nio.ch.FileChannelImpl.map0(Native Method)
              at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:904)
      

      Andrew Purtell has read that memory mapped files are not cleaned up after very well in Java:

      "Map failed" means the JVM ran out of virtual address space. If you search around stack overflow for suggestions on what to do when your app (in this case Phoenix) encounters this issue when using mapped buffers, the answers tend toward manually cleaning up the mapped buffers or explicitly triggering a full GC. See http://stackoverflow.com/questions/8553158/prevent-outofmemory-when-using-java-nio-mappedbytebuffer for example. There are apparently long standing JVM/JRE problems with reclamation of mapped buffers. I think we may want to explore in Phoenix a different way to achieve what the current code is doing.

      Instead of using memory mapped files, we could use heap memory, or perhaps there are other mechanisms too.

        Issue Links

          Activity

          Hide
          jamestaylor James Taylor added a comment -

          Maryann Xue - any thoughts?

          Show
          jamestaylor James Taylor added a comment - Maryann Xue - any thoughts?
          Hide
          maryannxue Maryann Xue added a comment -

          We could change it to "memory + spill to disk" strategy? In that case would we still use SpoolingResultIterator as the inner iterator?

          Show
          maryannxue Maryann Xue added a comment - We could change it to "memory + spill to disk" strategy? In that case would we still use SpoolingResultIterator as the inner iterator?
          Hide
          jamestaylor James Taylor added a comment -

          What do you think, Andrew Purtell? This would basically mean that we'd be writing files when the sort spills to disk instead of using memory mapped files.

          Show
          jamestaylor James Taylor added a comment - What do you think, Andrew Purtell ? This would basically mean that we'd be writing files when the sort spills to disk instead of using memory mapped files.
          Hide
          apurtell Andrew Purtell added a comment - - edited

          This would basically mean that we'd be writing files when the sort spills to disk instead of using memory mapped files.

          I think you're at the mercy of the JVM and OS when using mapped files, and this has already proven to be a fatal route for some users. Manually spilling instead if in-effect letting the OS do that by paging in and out the mmapped file is a fine solution.

          Show
          apurtell Andrew Purtell added a comment - - edited This would basically mean that we'd be writing files when the sort spills to disk instead of using memory mapped files. I think you're at the mercy of the JVM and OS when using mapped files, and this has already proven to be a fatal route for some users. Manually spilling instead if in-effect letting the OS do that by paging in and out the mmapped file is a fine solution.
          Hide
          jamestaylor James Taylor added a comment -

          Yanping Wang - do you think Apache Mnemonic can help us here? I've tagged this as a potential project for GSoC. Would you or Gary Wang be interested in being a mentor for it?

          Show
          jamestaylor James Taylor added a comment - Yanping Wang - do you think Apache Mnemonic can help us here? I've tagged this as a potential project for GSoC. Would you or Gary Wang be interested in being a mentor for it?
          Hide
          qichfan Wang, Gang added a comment - - edited

          Thanks James Taylor, Yes, definitely, Apache Mnemonic provided a mechanism for client code to utilize different kinds of device, e.g. off-heap, NVMe, SSD as additional memory space, note that the performance depends on those vary underlying devices. Furthermore, the memory allocator could be customized as service for your specific application logic.

          we can use the following code snippets to create a memory pool along with a general purpose allocator service.

          Main.java
          new SysMemAllocator(1024 * 1024 * 20 /*capacity*/, true); /*on off-heap*/
          new BigDataMemAllocator( Utils.getVolatileMemoryAllocatorService("vmem"), 1024 * 1024 * 20 /*capacity*/,  "." /*uri*/ , true); /*on volatile storage device*/
          new BigDataMemAllocator( Utils.getVolatileMemoryAllocatorService("pmalloc"),1024 * 1024 * 20, "./example.dat", true); /*on non-volatile storage device*/
          

          and then we can use createChunk(<...>) or createBuffer(<...>) to allocate memory resources, those external memory resources could be reclaimed automatically by JVM GC or manually by your code.

          Above is used for the applications of volatile block memory mode. if there are some huge object graphs introduced by this sorting operations, we can use the volatile object mode of Mnemonic, there are another two corresponding non-volatile modes but I think that might not be helpful for this case.

          Please refer to the example & testcase code of Apache Mnemonic for details, Thanks.

          Show
          qichfan Wang, Gang added a comment - - edited Thanks James Taylor , Yes, definitely, Apache Mnemonic provided a mechanism for client code to utilize different kinds of device, e.g. off-heap, NVMe, SSD as additional memory space, note that the performance depends on those vary underlying devices. Furthermore, the memory allocator could be customized as service for your specific application logic. we can use the following code snippets to create a memory pool along with a general purpose allocator service. Main.java new SysMemAllocator(1024 * 1024 * 20 /*capacity*/, true ); /*on off-heap*/ new BigDataMemAllocator( Utils.getVolatileMemoryAllocatorService( "vmem" ), 1024 * 1024 * 20 /*capacity*/, "." /*uri*/ , true ); /*on volatile storage device*/ new BigDataMemAllocator( Utils.getVolatileMemoryAllocatorService( "pmalloc" ),1024 * 1024 * 20, "./example.dat" , true ); /*on non- volatile storage device*/ and then we can use createChunk(<...>) or createBuffer(<...>) to allocate memory resources, those external memory resources could be reclaimed automatically by JVM GC or manually by your code. Above is used for the applications of volatile block memory mode. if there are some huge object graphs introduced by this sorting operations, we can use the volatile object mode of Mnemonic, there are another two corresponding non-volatile modes but I think that might not be helpful for this case. Please refer to the example & testcase code of Apache Mnemonic for details, Thanks.
          Hide
          ywang261 Yanping Wang added a comment -

          as Gary commented, we can use above code to allocate memory as needed. but we need to add code into Phoenix to make it aware there is a Mnemonic way for memory allocation.

          Show
          ywang261 Yanping Wang added a comment - as Gary commented, we can use above code to allocate memory as needed. but we need to add code into Phoenix to make it aware there is a Mnemonic way for memory allocation.
          Hide
          qichfan Wang, Gang added a comment - - edited

          After reading the code of class MappedByteBufferQueue, I think Mnemonic could be used to replace the temporary mapped file when "totalResultSize >= thresholdBytes", the writeBuffer and ReadBuffer could be directly obtained from an instance of Mnemonic BigDataMemAllocator, for this case, we can also use the memory clustering mechanism provided by Mnemonic to maximize the average performance over hybrid memory-like resources. Thanks.

          Show
          qichfan Wang, Gang added a comment - - edited After reading the code of class MappedByteBufferQueue, I think Mnemonic could be used to replace the temporary mapped file when "totalResultSize >= thresholdBytes", the writeBuffer and ReadBuffer could be directly obtained from an instance of Mnemonic BigDataMemAllocator, for this case, we can also use the memory clustering mechanism provided by Mnemonic to maximize the average performance over hybrid memory-like resources. Thanks.
          Hide
          ram_krish ramkrishna.s.vasudevan added a comment -

          I was looking into this issue after some discussion with Maryann and yes we identified this as a potential feature for using Mnemonic. I can check this and see what really is the benefit using mnemonic like project here.

          Show
          ram_krish ramkrishna.s.vasudevan added a comment - I was looking into this issue after some discussion with Maryann and yes we identified this as a potential feature for using Mnemonic. I can check this and see what really is the benefit using mnemonic like project here.
          Hide
          jamestaylor James Taylor added a comment -

          Thanks, ramkrishna.s.vasudevan. There's a GSoC student who's interested in this one. Perhaps you can act as his mentor if his proposal gets accepted? WDYT?

          Show
          jamestaylor James Taylor added a comment - Thanks, ramkrishna.s.vasudevan . There's a GSoC student who's interested in this one. Perhaps you can act as his mentor if his proposal gets accepted? WDYT?
          Hide
          ram_krish ramkrishna.s.vasudevan added a comment -

          If using mmap() was a stability issue- may be we can try as a first step to spill over to disk. Since again these results are Bytebuffers we can see if really we need to make use of the Mnemonic's ByteBuffers.

          Show
          ram_krish ramkrishna.s.vasudevan added a comment - If using mmap() was a stability issue- may be we can try as a first step to spill over to disk. Since again these results are Bytebuffers we can see if really we need to make use of the Mnemonic's ByteBuffers.
          Hide
          stack stack added a comment -

          Yanping Wang

          as Gary commented, we can use above code to allocate memory as needed. but we need to add code into Phoenix to make it aware there is a Mnemonic way for memory allocation.

          What would you need to add Yanping Wang? The pmalloc native lib and pmem because of the references above to vmem by Wang, Gang?

          mnemonic looks great but is it pretty new (I don't know)? (First commit was end of last year) In the incubator proposal (understandably), there is no mention of how mature/stable mnemonic is. You fellas think it good to deploy to production?

          Show
          stack stack added a comment - Yanping Wang as Gary commented, we can use above code to allocate memory as needed. but we need to add code into Phoenix to make it aware there is a Mnemonic way for memory allocation. What would you need to add Yanping Wang ? The pmalloc native lib and pmem because of the references above to vmem by Wang, Gang ? mnemonic looks great but is it pretty new (I don't know)? (First commit was end of last year) In the incubator proposal (understandably), there is no mention of how mature/stable mnemonic is. You fellas think it good to deploy to production?
          Hide
          ywang261 Yanping Wang added a comment -

          Hi, Stack, Your concern is certainly valid. Yes, the Mnemonic idea is new, in fact, the entire non-volatile computing for up-coming TB sized persistent memory is new. That was why I said to add code, not to change code. Add means we can have a switch to use or not use Mnemonic way to deal with memory allocation. Just as we did for Spark, we added non-volatile RDD and test Mnemonic way's performance impact. it does not impact Spark if developers decide not to use it.

          Sure we need add more documentation with use cases and examples in Mnemonic project. We will do that after code to be migrated in.
          we know allocate system memory during middle of application or spill data into disk are very high cost operations. we all can work together to find a better solution. we are open and welcome all ideas and suggestions to improve Mnemonic project.

          Show
          ywang261 Yanping Wang added a comment - Hi, Stack, Your concern is certainly valid. Yes, the Mnemonic idea is new, in fact, the entire non-volatile computing for up-coming TB sized persistent memory is new. That was why I said to add code, not to change code. Add means we can have a switch to use or not use Mnemonic way to deal with memory allocation. Just as we did for Spark, we added non-volatile RDD and test Mnemonic way's performance impact. it does not impact Spark if developers decide not to use it. Sure we need add more documentation with use cases and examples in Mnemonic project. We will do that after code to be migrated in. we know allocate system memory during middle of application or spill data into disk are very high cost operations. we all can work together to find a better solution. we are open and welcome all ideas and suggestions to improve Mnemonic project.
          Hide
          stack stack added a comment -

          Add means we can have a switch to use or not use Mnemonic way to deal with memory allocation.

          So, phoenix would have to provide an alternate for case where mnemonic is not available or not properly installed or is missing native, supporting libs? Or, does mnemonic do a noop or a fallback if a support is missing; e.g. you ask for nvram but none installed, what does mnemonic do? Does it fail to start or crash out or go a different route? I see I can ask to build without native lib support when I build it but am wondering if at runtime, you can ask the lib if it is good-to-go, coherent.. These questions belong better on the mnemonic mailing lists (when they get set up). Sounds like mnemonic would be good fit here (or phoenix runs inside the hbase process; the hbase process already sports a chunking/block store; the supporting environment could provide phoenix with mnemonic interface?) but phoenix would still need an alternative and it might be a while yet before mnemonic a viable option? Thanks Yanping Wang

          Show
          stack stack added a comment - Add means we can have a switch to use or not use Mnemonic way to deal with memory allocation. So, phoenix would have to provide an alternate for case where mnemonic is not available or not properly installed or is missing native, supporting libs? Or, does mnemonic do a noop or a fallback if a support is missing; e.g. you ask for nvram but none installed, what does mnemonic do? Does it fail to start or crash out or go a different route? I see I can ask to build without native lib support when I build it but am wondering if at runtime, you can ask the lib if it is good-to-go, coherent.. These questions belong better on the mnemonic mailing lists (when they get set up). Sounds like mnemonic would be good fit here (or phoenix runs inside the hbase process; the hbase process already sports a chunking/block store; the supporting environment could provide phoenix with mnemonic interface?) but phoenix would still need an alternative and it might be a while yet before mnemonic a viable option? Thanks Yanping Wang
          Hide
          qichfan Wang, Gang added a comment - - edited

          Thanks stack, how about fallback to Mnemonic only if OutOfMemoryError occurred ? we can also add some logic to send out signals to notify Phoenix for cleanup or entering safe mode ?

          Show
          qichfan Wang, Gang added a comment - - edited Thanks stack , how about fallback to Mnemonic only if OutOfMemoryError occurred ? we can also add some logic to send out signals to notify Phoenix for cleanup or entering safe mode ?
          Hide
          stack stack added a comment -

          Thanks stack, how about fallback to Mnemonic only if OutOfMemoryError occurred ?

          Triggering on an OOME seems a bit dodgy. Could trap on 'OutOfMemoryError: Map failed' but would have to read the code to ensure no other damaging 'side-effects'. OOME is post-Full-GC too, no? Would be good if could avoid a Full GC as trigger.

          Better if we were just all mnemonic, all the time (caveat good perf, etc). Just trying to learn what build and runtime need to look like if mnemonic route. Thanks.

          Show
          stack stack added a comment - Thanks stack, how about fallback to Mnemonic only if OutOfMemoryError occurred ? Triggering on an OOME seems a bit dodgy. Could trap on 'OutOfMemoryError: Map failed' but would have to read the code to ensure no other damaging 'side-effects'. OOME is post-Full-GC too, no? Would be good if could avoid a Full GC as trigger. Better if we were just all mnemonic, all the time (caveat good perf, etc). Just trying to learn what build and runtime need to look like if mnemonic route. Thanks.
          Hide
          qichfan Wang, Gang added a comment -

          Yes it is, so if use this fallback mechanism we have to prepare all before any fc.map operations and close the temporary file immediately once OOME for Mnemonic to take over. or the better we can trigger this fallback according to the runtime.freeMemory() ? Yanping Wang is it possible ?

          Show
          qichfan Wang, Gang added a comment - Yes it is, so if use this fallback mechanism we have to prepare all before any fc.map operations and close the temporary file immediately once OOME for Mnemonic to take over. or the better we can trigger this fallback according to the runtime.freeMemory() ? Yanping Wang is it possible ?
          Hide
          ywang261 Yanping Wang added a comment -

          Gary, It is probably better to trigger fallback after receiving first message of "Allocation Failure" from GC.
          I agree with Stack, we might need a clean switch, either have Mnemonic all time (when condition meets), or not at all.

          Show
          ywang261 Yanping Wang added a comment - Gary, It is probably better to trigger fallback after receiving first message of "Allocation Failure" from GC. I agree with Stack, we might need a clean switch, either have Mnemonic all time (when condition meets), or not at all.
          Hide
          RCheungIT Haoran Zhang added a comment -

          Dear James Taylor

          Thanks for your advice and I will add this JIRA to my proposal. I want to make sure that should I apply Apache Mnemonic to resolve this JIAR? Based on the discussion, I find that Apache Mnemonic is a fairly new project and will also introduce native lib dependency to Apache Phoenix. I worry that it may cause some stability issues which I can't handle.

          Thanks.

          Show
          RCheungIT Haoran Zhang added a comment - Dear James Taylor Thanks for your advice and I will add this JIRA to my proposal. I want to make sure that should I apply Apache Mnemonic to resolve this JIAR? Based on the discussion, I find that Apache Mnemonic is a fairly new project and will also introduce native lib dependency to Apache Phoenix. I worry that it may cause some stability issues which I can't handle. Thanks.
          Hide
          jamestaylor James Taylor added a comment -

          Thanks, Haoran. For PHOENIX-2405, we'd take the more conservation approach
          of using our SpoolingResultIterator which simply holds data in memory up to
          a threshold and them spills to disk. Then we'd want to perf test this
          against our current implementation (which uses memory mapped files). We can
          likely help you with this part if you do the dev and functional testing
          side of things.

          James

          On Thu, Mar 24, 2016 at 3:06 PM, Haoran Zhang (JIRA) <jira@apache.org>

          Show
          jamestaylor James Taylor added a comment - Thanks, Haoran. For PHOENIX-2405 , we'd take the more conservation approach of using our SpoolingResultIterator which simply holds data in memory up to a threshold and them spills to disk. Then we'd want to perf test this against our current implementation (which uses memory mapped files). We can likely help you with this part if you do the dev and functional testing side of things. James On Thu, Mar 24, 2016 at 3:06 PM, Haoran Zhang (JIRA) <jira@apache.org>
          Hide
          RCheungIT Haoran Zhang added a comment - - edited

          Thanks for you advice.

          I have updated my proposal.
          When you have time, please have a review.

          Thank you very much.

          Show
          RCheungIT Haoran Zhang added a comment - - edited Thanks for you advice. I have updated my proposal . When you have time, please have a review. Thank you very much.
          Hide
          RCheungIT Haoran Zhang added a comment -

          Hi James Taylor,

          When I want to run the SortMergeJoinIT, it will always start a mini local cluster first and this stage is very time-consuming.

          I want to know that is any way to run the integration test case in a given deployed Phoenix without init a mini cluster.
          I tried to find any config about that but failed.

          Thanks.

          Show
          RCheungIT Haoran Zhang added a comment - Hi James Taylor , When I want to run the SortMergeJoinIT, it will always start a mini local cluster first and this stage is very time-consuming. I want to know that is any way to run the integration test case in a given deployed Phoenix without init a mini cluster. I tried to find any config about that but failed. Thanks.
          Hide
          jamestaylor James Taylor added a comment -

          For an end-to-end test, yes spinning up a mini cluster is necessary. We amortize this cost by spinning up a single one and running lots of test suites on the same one. You can write lower level tests too that test at the iterator level. See MergeSortResultIteratorTest , SpoolingResultIteratorTest, and others in org.apache.phoenix.iterate package as well. Also, please make sure to @ mention your primary mentor, Maryann Xue.

          Show
          jamestaylor James Taylor added a comment - For an end-to-end test, yes spinning up a mini cluster is necessary. We amortize this cost by spinning up a single one and running lots of test suites on the same one. You can write lower level tests too that test at the iterator level. See MergeSortResultIteratorTest , SpoolingResultIteratorTest, and others in org.apache.phoenix.iterate package as well. Also, please make sure to @ mention your primary mentor, Maryann Xue .
          Hide
          maryannxue Maryann Xue added a comment -

          Haoran Zhang, like James Taylor said, I suggest you starting with lower level tests which do not require spinning up the cluster. That way you can 1) significantly accelerate your code debugging and probably maximize test coverage in the shortest time. 2) isolate the problem and do not have to worry about the interference of other Phoenix components.

          Show
          maryannxue Maryann Xue added a comment - Haoran Zhang , like James Taylor said, I suggest you starting with lower level tests which do not require spinning up the cluster. That way you can 1) significantly accelerate your code debugging and probably maximize test coverage in the shortest time. 2) isolate the problem and do not have to worry about the interference of other Phoenix components.
          Hide
          RCheungIT Haoran Zhang added a comment -

          Hi Maryann Xue && James Taylor,

          About this issue, in my opinion, what I should do is modified MappedByteBufferSegmentQueue making it flush to DeferredFileOutputStream instead of MappedByteBuffer.
          Is that correct?

          Another thing. MappedByteBuffer is a direct ByteBuffer of NIO which allocates memory out JVM heap. By contract, DeferredFileOutputStream allocates memory in JVM heap which means it will occupy a large amount of heap.
          Is that OK in this case?

          Thanks.

          Show
          RCheungIT Haoran Zhang added a comment - Hi Maryann Xue && James Taylor , About this issue, in my opinion, what I should do is modified MappedByteBufferSegmentQueue making it flush to DeferredFileOutputStream instead of MappedByteBuffer. Is that correct? Another thing. MappedByteBuffer is a direct ByteBuffer of NIO which allocates memory out JVM heap. By contract, DeferredFileOutputStream allocates memory in JVM heap which means it will occupy a large amount of heap. Is that OK in this case? Thanks.
          Hide
          maryannxue Maryann Xue added a comment -

          Yes, that's correct, Haoran Zhang. And don't forget to do "MemoryManager.allocate()" to make sure that the heap memory usage is tracked by Phoenix's memory manager and most like a Phoenix InsufficientMemoryException will be thrown other than OOM. Sorry for the late reply, Haoran Zhang

          Show
          maryannxue Maryann Xue added a comment - Yes, that's correct, Haoran Zhang . And don't forget to do "MemoryManager.allocate()" to make sure that the heap memory usage is tracked by Phoenix's memory manager and most like a Phoenix InsufficientMemoryException will be thrown other than OOM. Sorry for the late reply, Haoran Zhang
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user RCheungIT opened a pull request:

          https://github.com/apache/phoenix/pull/171

          Phoenix-2405

          https://issues.apache.org/jira/browse/PHOENIX-2405

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/phoenix/pull/171.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #171


          commit 86f134afb1d3e08c49fb49a8d655ab8de04be0d8
          Author: Haoran Zhang <hubert.zhang@outlook.com>
          Date: 2016-04-22T22:41:23Z

          Merge pull request #1 from apache/master

          Merge from master

          commit 4598a1f082cffe709c6124361fba21760fab4d65
          Author: RCheungIT <hubert.zhang@outlook.com>
          Date: 2016-05-27T17:12:41Z

          add deferredByteBufferQueue

          commit 16571f763858f348ea0832a23cc12cdf7cd41273
          Author: RCheungIT <hubert.zhang@outlook.com>
          Date: 2016-05-29T15:43:35Z

          extract interface for buffersegment and bufferqueue

          commit de308b99c4485c56bc543eda7362c2d2d93d07c6
          Author: RCheungIT <hubert.zhang@outlook.com>
          Date: 2016-05-31T15:49:14Z

          add memory management


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user RCheungIT opened a pull request: https://github.com/apache/phoenix/pull/171 Phoenix-2405 https://issues.apache.org/jira/browse/PHOENIX-2405 You can merge this pull request into a Git repository by running: $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/phoenix/pull/171.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #171 commit 86f134afb1d3e08c49fb49a8d655ab8de04be0d8 Author: Haoran Zhang <hubert.zhang@outlook.com> Date: 2016-04-22T22:41:23Z Merge pull request #1 from apache/master Merge from master commit 4598a1f082cffe709c6124361fba21760fab4d65 Author: RCheungIT <hubert.zhang@outlook.com> Date: 2016-05-27T17:12:41Z add deferredByteBufferQueue commit 16571f763858f348ea0832a23cc12cdf7cd41273 Author: RCheungIT <hubert.zhang@outlook.com> Date: 2016-05-29T15:43:35Z extract interface for buffersegment and bufferqueue commit de308b99c4485c56bc543eda7362c2d2d93d07c6 Author: RCheungIT <hubert.zhang@outlook.com> Date: 2016-05-31T15:49:14Z add memory management
          Hide
          RCheungIT Haoran Zhang added a comment -

          Hi Maryann Xue,

          About this JIRA I just finished an initial version. I tested it manually and it can work.
          I'm not sure whether I have done is what you need. Would you mind having a review on my pull request and give me any feedbacks?

          Thanks

          Show
          RCheungIT Haoran Zhang added a comment - Hi Maryann Xue , About this JIRA I just finished an initial version. I tested it manually and it can work. I'm not sure whether I have done is what you need. Would you mind having a review on my pull request and give me any feedbacks? Thanks
          Hide
          maryannxue Maryann Xue added a comment -

          Hi Haoran Zhang, thank you very much for the pull request! Pretty nice. I've made a few comments there on the pull request, and please let me know if you have any questions.

          Show
          maryannxue Maryann Xue added a comment - Hi Haoran Zhang , thank you very much for the pull request! Pretty nice. I've made a few comments there on the pull request, and please let me know if you have any questions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user RCheungIT commented on the issue:

          https://github.com/apache/phoenix/pull/171

          Pull looks good, but please squash all commits into one and prefix the commit message with the JIRA number so we tie the two together: PHOENIX-2405
          Improve performance and stability of server side sort for ORDER BY

          Show
          githubbot ASF GitHub Bot added a comment - Github user RCheungIT commented on the issue: https://github.com/apache/phoenix/pull/171 Pull looks good, but please squash all commits into one and prefix the commit message with the JIRA number so we tie the two together: PHOENIX-2405 Improve performance and stability of server side sort for ORDER BY
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user RCheungIT closed the pull request at:

          https://github.com/apache/phoenix/pull/171

          Show
          githubbot ASF GitHub Bot added a comment - Github user RCheungIT closed the pull request at: https://github.com/apache/phoenix/pull/171
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user RCheungIT opened a pull request:

          https://github.com/apache/phoenix/pull/175

          PHOENIX-2405 Not for merge, just request a review to check whether I'm on the right way

          https://issues.apache.org/jira/browse/PHOENIX-2405

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405-HBase-1.2

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/phoenix/pull/175.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #175


          commit 6566ccd5fa4e8fbfa4fea4e743ca56143d17bc48
          Author: RCheungIT <hubert.zhang@outlook.com>
          Date: 2016-05-27T17:12:41Z

          PHOENIX-2405 Improve performance and stability of server side sort for
          ORDER BY


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user RCheungIT opened a pull request: https://github.com/apache/phoenix/pull/175 PHOENIX-2405 Not for merge, just request a review to check whether I'm on the right way https://issues.apache.org/jira/browse/PHOENIX-2405 You can merge this pull request into a Git repository by running: $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405 -HBase-1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/phoenix/pull/175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #175 commit 6566ccd5fa4e8fbfa4fea4e743ca56143d17bc48 Author: RCheungIT <hubert.zhang@outlook.com> Date: 2016-05-27T17:12:41Z PHOENIX-2405 Improve performance and stability of server side sort for ORDER BY
          Hide
          RCheungIT Haoran Zhang added a comment - - edited

          Hi Maryann Xue,

          I just made a pull request which is tested manually and it seems can work.

          Would you mind having a rough review on it to check whether I'm on the right way this time?

          Thank you very much.

          Show
          RCheungIT Haoran Zhang added a comment - - edited Hi Maryann Xue , I just made a pull request which is tested manually and it seems can work. Would you mind having a rough review on it to check whether I'm on the right way this time? Thank you very much.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maryannxue commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69667743

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java —
          @@ -0,0 +1,357 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.phoenix.iterate;
          +
          +import org.apache.commons.io.input.CountingInputStream;
          +import org.apache.commons.io.output.DeferredFileOutputStream;
          +
          +import org.apache.phoenix.memory.MemoryManager;
          +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
          +import org.apache.phoenix.query.QueryServices;
          +import org.apache.phoenix.query.QueryServicesOptions;
          +
          +
          +import java.io.*;
          +import java.util.AbstractQueue;
          +import java.util.Iterator;
          +
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
          +
          +
          +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> {
          — End diff –

          The logic is very confusing here. My idea was to extend or modify the current SpoolingResultIterator so that it can take a ResultEntry and/or a Tuple as a record. But meanwhile this does not have to do with the XXXQueue here. XXXQueue deals with the priority queue logic and SpoolingXXX deals with the deferred byte buffer logic. Let me know whether you understand how it's supposed to work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69667743 — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java — @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { — End diff – The logic is very confusing here. My idea was to extend or modify the current SpoolingResultIterator so that it can take a ResultEntry and/or a Tuple as a record. But meanwhile this does not have to do with the XXXQueue here. XXXQueue deals with the priority queue logic and SpoolingXXX deals with the deferred byte buffer logic. Let me know whether you understand how it's supposed to work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maryannxue commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69668023

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java —
          @@ -0,0 +1,357 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.phoenix.iterate;
          +
          +import org.apache.commons.io.input.CountingInputStream;
          +import org.apache.commons.io.output.DeferredFileOutputStream;
          +
          +import org.apache.phoenix.memory.MemoryManager;
          +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
          +import org.apache.phoenix.query.QueryServices;
          +import org.apache.phoenix.query.QueryServicesOptions;
          +
          +
          +import java.io.*;
          +import java.util.AbstractQueue;
          +import java.util.Iterator;
          +
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
          +
          +
          +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> {
          +
          + private ResultQueue<T> spoolFrom;
          +
          + private boolean closed ;
          + private boolean flushed;
          + private DeferredFileOutputStream spoolTo;
          + private MemoryChunk chunk;
          + private int size = 0;
          + private long inMemByteSize = 0L;
          + private int index;
          +
          +
          +
          + SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory) {
          +
          + long startTime = System.currentTimeMillis();
          + chunk = mm.allocate(0, thresholdBytes);
          + long waitTime = System.currentTimeMillis() - startTime;
          + GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
          +
          + int size = (int)chunk.getSize();
          + spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
          + @Override
          + protected void thresholdReached() throws IOException {
          + try

          { + super.thresholdReached(); + }

          finally

          { + chunk.close(); + }

          + }
          + };
          +
          +
          + }
          +
          + public int index()

          { + return this.index; + }

          +
          +
          +
          + protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
          +
          + protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file);
          +
          + @Override
          + public boolean offer(T t) {
          + if (closed || flushed)

          { + return false; + }
          + boolean result = writeRecord(t, spoolTo);
          + if(result){
          + if(!spoolTo.isInMemory()){ + flushToDisk(); + }
          + size++;
          + }
          +
          +
          + return result;
          + }
          +
          + protected abstract boolean writeRecord(T t, OutputStream outputStream);
          +
          + private void flushToMemory(){ + byte[] data = spoolTo.getData(); + chunk.resize(data.length); + spoolFrom = createInMemoryResultQueue(data, chunk); + GLOBAL_MEMORY_CHUNK_BYTES.update(data.length); + flushed = true; + }
          +
          +
          + private void flushToDisk(){
          + long sizeOfSpoolFile = spoolTo.getFile().length();
          + GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
          + GLOBAL_SPOOL_FILE_COUNTER.increment();
          + spoolFrom = createOnDiskResultQueue(spoolTo.getFile());
          + if (spoolTo.getFile() != null) { + spoolTo.getFile().deleteOnExit(); + }
          + inMemByteSize = 0;
          + flushed = true;
          + }
          +
          +
          + public boolean isFlushed(){ + return flushed; + }
          +
          + public T peek() {
          + if(!flushed){ + flushToMemory(); + }
          + return spoolFrom.peek();
          + }
          +
          + @Override
          + public T poll() {
          + if(!flushed){ + flushToMemory(); + }
          + return spoolFrom.poll();
          + }
          +
          + public void close() throws IOException {
          + if(spoolFrom != null){ + spoolFrom.close(); + }
          + }
          +
          + @Override
          + public Iterator<T> iterator() {
          + if(!flushed){ + flushToMemory(); + }
          + return spoolFrom.iterator();
          + }
          +
          + @Override
          + public int size() { + return size ; + }
          +
          + public long getInMemByteSize(){ + return inMemByteSize; + };
          +
          + private static abstract class ResultQueue<T> extends AbstractQueue<T> implements Closeable{}
          +
          + protected static abstract class InMemoryResultQueue<T> extends ResultQueue<T> {
          + private final MemoryChunk memoryChunk;
          + protected final byte[] bytes;
          + private T next;
          + private AtomicInteger offset = new AtomicInteger(0);
          +
          + protected InMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk) { + this.bytes = bytes; + this.memoryChunk = memoryChunk; + advance(offset); + }
          +
          + protected abstract T advance(AtomicInteger offset);
          +
          + @Override
          + public boolean offer(T t) { + return false; + }

          +
          + @Override
          + public T peek()

          { + return next; + }

          +
          + @Override
          + public T poll()

          { + T current = next; + next = advance(offset); + return current; + }

          +
          +
          + public void close()

          { + memoryChunk.close(); + }

          +
          +
          + @Override
          + public Iterator<T> iterator() {
          + return new Iterator<T>(){
          + AtomicInteger iteratorOffset = new AtomicInteger(offset.get());
          + private T next = advance(iteratorOffset);
          — End diff –

          One reminder: I said in my previous review that we'd better avoid discarding a old XXXSegmentQueue and starting a new one all the time when we switch between these segment queues. So we should ultimately remove this offset thing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69668023 — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java — @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { + + private ResultQueue<T> spoolFrom; + + private boolean closed ; + private boolean flushed; + private DeferredFileOutputStream spoolTo; + private MemoryChunk chunk; + private int size = 0; + private long inMemByteSize = 0L; + private int index; + + + + SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory) { + + long startTime = System.currentTimeMillis(); + chunk = mm.allocate(0, thresholdBytes); + long waitTime = System.currentTimeMillis() - startTime; + GLOBAL_MEMORY_WAIT_TIME.update(waitTime); + + int size = (int)chunk.getSize(); + spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) { + @Override + protected void thresholdReached() throws IOException { + try { + super.thresholdReached(); + } finally { + chunk.close(); + } + } + }; + + + } + + public int index() { + return this.index; + } + + + + protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk); + + protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file); + + @Override + public boolean offer(T t) { + if (closed || flushed) { + return false; + } + boolean result = writeRecord(t, spoolTo); + if(result){ + if(!spoolTo.isInMemory()){ + flushToDisk(); + } + size++; + } + + + return result; + } + + protected abstract boolean writeRecord(T t, OutputStream outputStream); + + private void flushToMemory(){ + byte[] data = spoolTo.getData(); + chunk.resize(data.length); + spoolFrom = createInMemoryResultQueue(data, chunk); + GLOBAL_MEMORY_CHUNK_BYTES.update(data.length); + flushed = true; + } + + + private void flushToDisk(){ + long sizeOfSpoolFile = spoolTo.getFile().length(); + GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile); + GLOBAL_SPOOL_FILE_COUNTER.increment(); + spoolFrom = createOnDiskResultQueue(spoolTo.getFile()); + if (spoolTo.getFile() != null) { + spoolTo.getFile().deleteOnExit(); + } + inMemByteSize = 0; + flushed = true; + } + + + public boolean isFlushed(){ + return flushed; + } + + public T peek() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.peek(); + } + + @Override + public T poll() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.poll(); + } + + public void close() throws IOException { + if(spoolFrom != null){ + spoolFrom.close(); + } + } + + @Override + public Iterator<T> iterator() { + if(!flushed){ + flushToMemory(); + } + return spoolFrom.iterator(); + } + + @Override + public int size() { + return size ; + } + + public long getInMemByteSize(){ + return inMemByteSize; + }; + + private static abstract class ResultQueue<T> extends AbstractQueue<T> implements Closeable{} + + protected static abstract class InMemoryResultQueue<T> extends ResultQueue<T> { + private final MemoryChunk memoryChunk; + protected final byte[] bytes; + private T next; + private AtomicInteger offset = new AtomicInteger(0); + + protected InMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk) { + this.bytes = bytes; + this.memoryChunk = memoryChunk; + advance(offset); + } + + protected abstract T advance(AtomicInteger offset); + + @Override + public boolean offer(T t) { + return false; + } + + @Override + public T peek() { + return next; + } + + @Override + public T poll() { + T current = next; + next = advance(offset); + return current; + } + + + public void close() { + memoryChunk.close(); + } + + + @Override + public Iterator<T> iterator() { + return new Iterator<T>(){ + AtomicInteger iteratorOffset = new AtomicInteger(offset.get()); + private T next = advance(iteratorOffset); — End diff – One reminder: I said in my previous review that we'd better avoid discarding a old XXXSegmentQueue and starting a new one all the time when we switch between these segment queues. So we should ultimately remove this offset thing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maryannxue commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69668052

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java —
          @@ -0,0 +1,357 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.phoenix.iterate;
          +
          +import org.apache.commons.io.input.CountingInputStream;
          +import org.apache.commons.io.output.DeferredFileOutputStream;
          +
          +import org.apache.phoenix.memory.MemoryManager;
          +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
          +import org.apache.phoenix.query.QueryServices;
          +import org.apache.phoenix.query.QueryServicesOptions;
          +
          +
          +import java.io.*;
          +import java.util.AbstractQueue;
          +import java.util.Iterator;
          +
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
          +
          +
          +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> {
          +
          + private ResultQueue<T> spoolFrom;
          +
          + private boolean closed ;
          + private boolean flushed;
          + private DeferredFileOutputStream spoolTo;
          + private MemoryChunk chunk;
          + private int size = 0;
          + private long inMemByteSize = 0L;
          + private int index;
          +
          +
          +
          + SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory) {
          +
          + long startTime = System.currentTimeMillis();
          + chunk = mm.allocate(0, thresholdBytes);
          + long waitTime = System.currentTimeMillis() - startTime;
          + GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
          +
          + int size = (int)chunk.getSize();
          + spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) {
          + @Override
          + protected void thresholdReached() throws IOException {
          + try

          { + super.thresholdReached(); + }

          finally

          { + chunk.close(); + }

          + }
          + };
          +
          +
          + }
          +
          + public int index()

          { + return this.index; + }

          +
          +
          +
          + protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
          +
          + protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file);
          +
          + @Override
          + public boolean offer(T t) {
          + if (closed || flushed)

          { + return false; + }

          + boolean result = writeRecord(t, spoolTo);
          + if(result){
          — End diff –

          I'm pretty lost at why this "if" block here, means we flush to disk all the time? I'll stop here though, leaving all other doubts, coz this all (XXXQueue classes) is gonna take a rewrite I think.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69668052 — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java — @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { + + private ResultQueue<T> spoolFrom; + + private boolean closed ; + private boolean flushed; + private DeferredFileOutputStream spoolTo; + private MemoryChunk chunk; + private int size = 0; + private long inMemByteSize = 0L; + private int index; + + + + SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int thresholdBytes, String spoolDirectory) { + + long startTime = System.currentTimeMillis(); + chunk = mm.allocate(0, thresholdBytes); + long waitTime = System.currentTimeMillis() - startTime; + GLOBAL_MEMORY_WAIT_TIME.update(waitTime); + + int size = (int)chunk.getSize(); + spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) { + @Override + protected void thresholdReached() throws IOException { + try { + super.thresholdReached(); + } finally { + chunk.close(); + } + } + }; + + + } + + public int index() { + return this.index; + } + + + + protected abstract InMemoryResultQueue<T> createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk); + + protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File file); + + @Override + public boolean offer(T t) { + if (closed || flushed) { + return false; + } + boolean result = writeRecord(t, spoolTo); + if(result){ — End diff – I'm pretty lost at why this "if" block here, means we flush to disk all the time? I'll stop here though, leaving all other doubts, coz this all (XXXQueue classes) is gonna take a rewrite I think.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maryannxue commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69668528

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java —
          @@ -133,7 +134,10 @@ public PeekingResultIterator newIterator(StatementContext context, ResultIterato
          Expression expression = RowKeyExpression.INSTANCE;
          OrderByExpression orderByExpression = new OrderByExpression(expression, false, true);
          int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);

          • return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
            + String spoolDirectory = services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY);
              • End diff –

          In my previous review I might not have made it very clear that the idea of moving MemoryManager into constructors was more of a question rather than advice. Since I hadn't studied the code so carefully, I was trying to ask you if that would be better, or worse? Looks like these parameters are getting all over the place and my suggestion was not a good one?

          Show
          githubbot ASF GitHub Bot added a comment - Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69668528 — Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java — @@ -133,7 +134,10 @@ public PeekingResultIterator newIterator(StatementContext context, ResultIterato Expression expression = RowKeyExpression.INSTANCE; OrderByExpression orderByExpression = new OrderByExpression(expression, false, true); int threshold = services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); return new OrderedResultIterator(scanner, Collections.<OrderByExpression>singletonList(orderByExpression), threshold); + String spoolDirectory = services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY); End diff – In my previous review I might not have made it very clear that the idea of moving MemoryManager into constructors was more of a question rather than advice. Since I hadn't studied the code so carefully, I was trying to ask you if that would be better, or worse? Looks like these parameters are getting all over the place and my suggestion was not a good one?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user maryannxue commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69668752

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java —
          @@ -45,10 +45,7 @@
          import org.apache.phoenix.exception.SQLExceptionInfo;
          import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
          import org.apache.phoenix.expression.Expression;
          -import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
          -import org.apache.phoenix.iterate.MappedByteBufferQueue;
          -import org.apache.phoenix.iterate.ParallelScanGrouper;
          -import org.apache.phoenix.iterate.ResultIterator;
          +import org.apache.phoenix.iterate.*;
          — End diff –

          Please do always check your patch before you submit. This is not the right coding style, and such changes should never appear in a patch/pull request.

          Show
          githubbot ASF GitHub Bot added a comment - Github user maryannxue commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69668752 — Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java — @@ -45,10 +45,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple; import org.apache.phoenix.expression.Expression; -import org.apache.phoenix.iterate.DefaultParallelScanGrouper; -import org.apache.phoenix.iterate.MappedByteBufferQueue; -import org.apache.phoenix.iterate.ParallelScanGrouper; -import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.*; — End diff – Please do always check your patch before you submit. This is not the right coding style, and such changes should never appear in a patch/pull request.
          Hide
          lhofhansl Lars Hofhansl added a comment -

          Ankit Singhal what's your thought here? 4.8 is getting big, should we delay this for a later release.

          Show
          lhofhansl Lars Hofhansl added a comment - Ankit Singhal what's your thought here? 4.8 is getting big, should we delay this for a later release.
          Hide
          ankit@apache.org Ankit Singhal added a comment -

          Yes agreed Lars Hofhansl, Enough testing has already been done for 4.8 around the query performance. so Let's move this out of 4.8 to avoid any regression.
          And anyways with new release cadence, this good work will be available in any subsequent release in shorter time.

          Show
          ankit@apache.org Ankit Singhal added a comment - Yes agreed Lars Hofhansl , Enough testing has already been done for 4.8 around the query performance. so Let's move this out of 4.8 to avoid any regression. And anyways with new release cadence, this good work will be available in any subsequent release in shorter time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user RCheungIT commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69728427

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java —
          @@ -0,0 +1,357 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.phoenix.iterate;
          +
          +import org.apache.commons.io.input.CountingInputStream;
          +import org.apache.commons.io.output.DeferredFileOutputStream;
          +
          +import org.apache.phoenix.memory.MemoryManager;
          +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
          +import org.apache.phoenix.query.QueryServices;
          +import org.apache.phoenix.query.QueryServicesOptions;
          +
          +
          +import java.io.*;
          +import java.util.AbstractQueue;
          +import java.util.Iterator;
          +
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
          +
          +
          +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> {
          — End diff –

          Hi @maryannxue, I think I don't get your idea here. If the modify of SpoolingResultIterator is done, where the modified SpoolingResultIterator should be? What I did here, is modify the SpoolingResultIterator as SpoolingByteBufferSegmentQueue, so the SpoolingByteBufferSegmentQueue deals with the priority queue logic and the deferred byte buffer logic in the same time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user RCheungIT commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69728427 — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java — @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { — End diff – Hi @maryannxue, I think I don't get your idea here. If the modify of SpoolingResultIterator is done, where the modified SpoolingResultIterator should be? What I did here, is modify the SpoolingResultIterator as SpoolingByteBufferSegmentQueue, so the SpoolingByteBufferSegmentQueue deals with the priority queue logic and the deferred byte buffer logic in the same time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user RCheungIT commented on a diff in the pull request:

          https://github.com/apache/phoenix/pull/175#discussion_r69792221

          — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java —
          @@ -0,0 +1,357 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.phoenix.iterate;
          +
          +import org.apache.commons.io.input.CountingInputStream;
          +import org.apache.commons.io.output.DeferredFileOutputStream;
          +
          +import org.apache.phoenix.memory.MemoryManager;
          +import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
          +import org.apache.phoenix.query.QueryServices;
          +import org.apache.phoenix.query.QueryServicesOptions;
          +
          +
          +import java.io.*;
          +import java.util.AbstractQueue;
          +import java.util.Iterator;
          +
          +import java.util.concurrent.atomic.AtomicInteger;
          +
          +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
          +
          +
          +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> {
          — End diff –

          Hi @maryannxue, it seems that I get your idea now. Do you mean I should not change the sort logic in memory? What I only need to do is modify the flush logic, flush the sorted result to SpoolingResultIterator. Is that correct? Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user RCheungIT commented on a diff in the pull request: https://github.com/apache/phoenix/pull/175#discussion_r69792221 — Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java — @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.commons.io.input.CountingInputStream; +import org.apache.commons.io.output.DeferredFileOutputStream; + +import org.apache.phoenix.memory.MemoryManager; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; + + +import java.io.*; +import java.util.AbstractQueue; +import java.util.Iterator; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.phoenix.monitoring.GlobalClientMetrics.*; + + +public abstract class SpoolingByteBufferSegmentQueue<T> extends AbstractQueue<T> { — End diff – Hi @maryannxue, it seems that I get your idea now. Do you mean I should not change the sort logic in memory? What I only need to do is modify the flush logic, flush the sorted result to SpoolingResultIterator. Is that correct? Thanks.
          Hide
          jamestaylor James Taylor added a comment -

          Maryann Xue - looks like Haoran Zhang had a question about a week back. Would you mind taking a look? FWIW, Haoran, if you don't hear back in a day, it's ok to re-ping.

          Show
          jamestaylor James Taylor added a comment - Maryann Xue - looks like Haoran Zhang had a question about a week back. Would you mind taking a look? FWIW, Haoran, if you don't hear back in a day, it's ok to re-ping.
          Hide
          maryannxue Maryann Xue added a comment -

          James Taylor We actually had a skype meeting right after that. Anyway, I was just gonna check his progress again. Haoran Zhang, any update?

          Show
          maryannxue Maryann Xue added a comment - James Taylor We actually had a skype meeting right after that. Anyway, I was just gonna check his progress again. Haoran Zhang , any update?
          Hide
          jamestaylor James Taylor added a comment -

          Great! Thanks for the update, Maryann. Please make sure you keep the JIRA up to date with any offline discussions too.

          Show
          jamestaylor James Taylor added a comment - Great! Thanks for the update, Maryann. Please make sure you keep the JIRA up to date with any offline discussions too.
          Hide
          RCheungIT Haoran Zhang added a comment -

          Hi Maryann Xue, I currently working on it, a code review will be requested before next Monday.
          The only problem is about how to get rid of offset in iterators.
          As it is not a block problem, I plan to discuss it based on the code in next code review.

          Hi James Taylor thanks for your advice, I will keep that in mind.

          Show
          RCheungIT Haoran Zhang added a comment - Hi Maryann Xue , I currently working on it, a code review will be requested before next Monday. The only problem is about how to get rid of offset in iterators. As it is not a block problem, I plan to discuss it based on the code in next code review. Hi James Taylor thanks for your advice, I will keep that in mind.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user RCheungIT opened a pull request:

          https://github.com/apache/phoenix/pull/184

          PHOENIX-2405

          https://issues.apache.org/jira/browse/PHOENIX-2405

          Hi @maryannxue, I guess this time it may be closer to what you described.
          I think the threshold in DeferredResultIterator should be different from the threshold in DeferredByteBufferSegmentQueue, but I don't know where to get it.
          Also, I don't find a good way to get rid of the offset in Iterator.
          Would you mind giving me any suggestions?

          Thanks

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405-v3

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/phoenix/pull/184.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #184


          commit 27df6878397b6e6c70c44d288f8d89ba35187880
          Author: RCheungIT <hubert.zhang@outlook.com>
          Date: 2016-07-19T15:54:17Z

          PHOENIX-2405 Improve performance and stability of server side sort for ORDER BY


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user RCheungIT opened a pull request: https://github.com/apache/phoenix/pull/184 PHOENIX-2405 https://issues.apache.org/jira/browse/PHOENIX-2405 Hi @maryannxue, I guess this time it may be closer to what you described. I think the threshold in DeferredResultIterator should be different from the threshold in DeferredByteBufferSegmentQueue, but I don't know where to get it. Also, I don't find a good way to get rid of the offset in Iterator. Would you mind giving me any suggestions? Thanks You can merge this pull request into a Git repository by running: $ git pull https://github.com/RCheungIT/phoenix PHOENIX-2405 -v3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/phoenix/pull/184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #184 commit 27df6878397b6e6c70c44d288f8d89ba35187880 Author: RCheungIT <hubert.zhang@outlook.com> Date: 2016-07-19T15:54:17Z PHOENIX-2405 Improve performance and stability of server side sort for ORDER BY
          Hide
          jamestaylor James Taylor added a comment -

          Any updates here, Maryann Xue & Haoran Zhang? The pull request is a couple weeks old. Reminder: if conversations happen offline, please post summaries here.

          Show
          jamestaylor James Taylor added a comment - Any updates here, Maryann Xue & Haoran Zhang ? The pull request is a couple weeks old. Reminder: if conversations happen offline, please post summaries here.

            People

            • Assignee:
              RCheungIT Haoran Zhang
              Reporter:
              jamestaylor James Taylor
            • Votes:
              0 Vote for this issue
              Watchers:
              14 Start watching this issue

              Dates

              • Created:
                Updated:

                Development