Apache Jena
  1. Apache Jena
  2. JENA-44

Support external sorting of bindings in ARQ

    Details

    • Type: New Feature New Feature
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: ARQ
    • Labels:
      None

      Description

      In QueryIterSort, the sorting of the contents of an Iterator<Binding> is done in memory, using Arrays.sort. This can be problematic where the set to be sorted is large. A possible solution could be to use an external, disk-backed algorithm. A hybrid approach may be better, whereby we attempt the in-memory sort, but when the number of bindings encountered goes over a certain number, resort to the disk-backed variant.

      1. JENA-44-ARQ_r1165687.patch
        32 kB
        Stephen Allen
      2. JENA-44_ARQ_r1165123.patch
        25 kB
        Paolo Castagna
      3. JENA-44-Depends-on-JENA-99-r1157891.patch
        5 kB
        Stephen Allen
      4. JENA-44_ARQ_r1156212.patch
        40 kB
        Paolo Castagna
      5. JENA-44_ARQ_r8724.patch
        52 kB
        Sam Tunnicliffe
      6. JENA-44_ARQ_r8531.patch
        47 kB
        Paolo Castagna
      7. JENA-44-0.patch
        47 kB
        Sam Tunnicliffe

        Issue Links

          Activity

          Hide
          Sam Tunnicliffe added a comment -

          Candidate implementation of the hybrid memory/disk approach. The core sort class is adapted from one found in the Project Voldemort. This is Apache 2 licensed, and has been modfied somewhat here, so I've just added an additional copyright statement to the existing source header - advice on whether this is the correct approach would be welcomed.

          Show
          Sam Tunnicliffe added a comment - Candidate implementation of the hybrid memory/disk approach. The core sort class is adapted from one found in the Project Voldemort. This is Apache 2 licensed, and has been modfied somewhat here, so I've just added an additional copyright statement to the existing source header - advice on whether this is the correct approach would be welcomed.
          Hide
          Paolo Castagna added a comment -

          For people wanting to test behaviour and performances of external sorting ARQ and TDB snapshots patched are available here:
          http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/arq/2.8.8-JENA_44-SNAPSHOT/
          http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/tdb/0.8.10-JENA_44-SNAPSHOT/
          Let us know if you have issue with those.

          Show
          Paolo Castagna added a comment - For people wanting to test behaviour and performances of external sorting ARQ and TDB snapshots patched are available here: http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/arq/2.8.8-JENA_44-SNAPSHOT/ http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/tdb/0.8.10-JENA_44-SNAPSHOT/ Let us know if you have issue with those.
          Hide
          Andy Seaborne added a comment -

          Will JENA-29 (query cancellation) interact with this? (subtext - if not, why not?!) QueryIterSort has changed.

          It does not say explciitly it's originally ASF licensed, just "Copyright 2008-2009 LinkedIn" adjacent to a copy of the BSD license. To be clear, could the patch please have a mention of the ASL and a trace back to the origin? This won't be such an issue if we have incorporated the code into ASF SVN but we haven't.

          Thanks - Andy

          Show
          Andy Seaborne added a comment - Will JENA-29 (query cancellation) interact with this? (subtext - if not, why not?!) QueryIterSort has changed. It does not say explciitly it's originally ASF licensed, just "Copyright 2008-2009 LinkedIn" adjacent to a copy of the BSD license. To be clear, could the patch please have a mention of the ASL and a trace back to the origin? This won't be such an issue if we have incorporated the code into ASF SVN but we haven't. Thanks - Andy
          Hide
          Paolo Castagna added a comment -

          Indeed, it would be beneficial to be able to cancel a large sort (even when it is an external sort).
          JENA-29 adds cancel, but if cancel is called while ARQ is performing a large in-memory sort, the cancel will not work until the sort has finished (or the machine has crashed or become unresponsive doing GC). We have experience these sort of problems.
          Therefore, JENA-29 and JENA-44 are complementary and they could benefit from each others.
          We are already seeing the benefit from JENA-44, since it put a cap, in practice, on the amount of memory used by ARQ to do large sorts.
          Moreover, JENA-44 could use same mechanism to serialize/deserialize bindings from JENA-45, if that is going to go intro trunk before JENA-44.

          Show
          Paolo Castagna added a comment - Indeed, it would be beneficial to be able to cancel a large sort (even when it is an external sort). JENA-29 adds cancel, but if cancel is called while ARQ is performing a large in-memory sort, the cancel will not work until the sort has finished (or the machine has crashed or become unresponsive doing GC). We have experience these sort of problems. Therefore, JENA-29 and JENA-44 are complementary and they could benefit from each others. We are already seeing the benefit from JENA-44 , since it put a cap, in practice, on the amount of memory used by ARQ to do large sorts. Moreover, JENA-44 could use same mechanism to serialize/deserialize bindings from JENA-45 , if that is going to go intro trunk before JENA-44 .
          Hide
          Sam Tunnicliffe added a comment -

          I'll merge in the changes from JENA-29 now.

          I'll update the incorrect license text too. Sorry, it appears that in moving it to the bottom of the file, I seem to have pasted the wrong text in. The original source[1] is ASL licensed

          [1] https://github.com/voldemort/voldemort/blob/2d6f68b09c3bdc23dcf3ae1f91c9285fbd668820/src/java/voldemort/store/readonly/ExternalSorter.java

          Show
          Sam Tunnicliffe added a comment - I'll merge in the changes from JENA-29 now. I'll update the incorrect license text too. Sorry, it appears that in moving it to the bottom of the file, I seem to have pasted the wrong text in. The original source [1] is ASL licensed [1] https://github.com/voldemort/voldemort/blob/2d6f68b09c3bdc23dcf3ae1f91c9285fbd668820/src/java/voldemort/store/readonly/ExternalSorter.java
          Hide
          Paolo Castagna added a comment - - edited

          > I'll merge in the changes from JENA-29 now.

          Great, then you could also experience the thrill of jena-patcher [1] script and update the JENA-44 SNAPSHOTs for people to try/test this (nice) patch.

          [1] https://github.com/castagna/jena-patcher

          Show
          Paolo Castagna added a comment - - edited > I'll merge in the changes from JENA-29 now. Great, then you could also experience the thrill of jena-patcher [1] script and update the JENA-44 SNAPSHOTs for people to try/test this (nice) patch. [1] https://github.com/castagna/jena-patcher
          Hide
          Paolo Castagna added a comment -

          This is an updated patch with changes from JENA-29 (just one minor conflict which was easy to get around).

          Ideally, we should now stop a large external sort when cancel is called.

          Show
          Paolo Castagna added a comment - This is an updated patch with changes from JENA-29 (just one minor conflict which was easy to get around). Ideally, we should now stop a large external sort when cancel is called.
          Hide
          Paolo Castagna added a comment -

          QueryIterSort extends QueryIterPlainWrapper which extends QueryIter which extends QueryIteratorBase which implements QueryIterator.
          QueryIteratorBase has the machinary to deal with query cancellation (i.e. every time hasNext() is called it is checking if the query has been cancelled).
          Moreover, QueryIterSort overrides requestCancel() same way as QueryIterGroup does.
          Therefore, this patch should play nicely with the query cancellation (i.e. JENA-29).
          JENA-44 is not using the serialization for bindings proposed by JENA-45. However, we can commit this and when JENA-45 gets committed we would need to change the BindingSerializer to use that, so we have just one way to serialize bindings.

          If this is the case, are there any technical reasons which are holding this to go into trunk?
          If not, I am happy to go ahead and commit this.

          We are already using a patched ARQ version with JENA-44 and we have not seen problems with it.
          It would greatly help us to commit this. Since we wouldn't need to manage separate SNAPSHOTs with this patch applied.

          Show
          Paolo Castagna added a comment - QueryIterSort extends QueryIterPlainWrapper which extends QueryIter which extends QueryIteratorBase which implements QueryIterator. QueryIteratorBase has the machinary to deal with query cancellation (i.e. every time hasNext() is called it is checking if the query has been cancelled). Moreover, QueryIterSort overrides requestCancel() same way as QueryIterGroup does. Therefore, this patch should play nicely with the query cancellation (i.e. JENA-29 ). JENA-44 is not using the serialization for bindings proposed by JENA-45 . However, we can commit this and when JENA-45 gets committed we would need to change the BindingSerializer to use that, so we have just one way to serialize bindings. If this is the case, are there any technical reasons which are holding this to go into trunk? If not, I am happy to go ahead and commit this. We are already using a patched ARQ version with JENA-44 and we have not seen problems with it. It would greatly help us to commit this. Since we wouldn't need to manage separate SNAPSHOTs with this patch applied.
          Hide
          Paolo Castagna added a comment -

          Currently, a large sort done externally cannot be canceled or, more precisely, even if cancel() is invoked or a timeout is set, the external sort once started it cannot be interrupted.
          We should make it checking a flag which will be set if requestCancel() is called and stop the sorting as soon as possible once the flag has been set.

          Show
          Paolo Castagna added a comment - Currently, a large sort done externally cannot be canceled or, more precisely, even if cancel() is invoked or a timeout is set, the external sort once started it cannot be interrupted. We should make it checking a flag which will be set if requestCancel() is called and stop the sorting as soon as possible once the flag has been set.
          Hide
          Sam Tunnicliffe added a comment -

          Updated patch following release of 2.8.8.

          Show
          Sam Tunnicliffe added a comment - Updated patch following release of 2.8.8.
          Hide
          Andy Seaborne added a comment -

          a few questions:

          1 - what can be shared between JENA-45 (spill to disk update)? Code for to/from disk?

          JENA-45 uses SSE for tuples parsing (but what about bnodes?_
          JENA-44 has BindingSerializer.

          Is there anything in common?

          What about the deferred buffering?

          2 - Control of resources (part 1)

          JENA-44 reads a symbol ; JENA-45 has a ThresholdPolicyCount. Do we need a combination?

          3 - Generally, RAM is a system-wide resource used for caching in TDB/32. Do we need to have something adaptive?

          Show
          Andy Seaborne added a comment - a few questions: 1 - what can be shared between JENA-45 (spill to disk update)? Code for to/from disk? JENA-45 uses SSE for tuples parsing (but what about bnodes?_ JENA-44 has BindingSerializer. Is there anything in common? What about the deferred buffering? 2 - Control of resources (part 1) JENA-44 reads a symbol ; JENA-45 has a ThresholdPolicyCount. Do we need a combination? 3 - Generally, RAM is a system-wide resource used for caching in TDB/32. Do we need to have something adaptive?
          Hide
          Stephen Allen added a comment -

          Some answers with respect to JENA-45:

          1) In JENA-45, bnodes are encoded with NodeFmtLib.safeBNodeLabel(String) and decoded with a new method NodeFmtLib.decodeSafeBNodeLabel(String). This outputs 'B' followed by the internal blank node label encoded in hexadecimal. This allows us to deserialize back to the proper Jena bnode.

          2) I think we need to add a way make ThresholdPolicyCount configurable externally. Alternatively, we can change this to actual memory usage by looking at the contents of each Binding object as it is added.

          3) Ultimately having unified memory manager (perhaps with a preset limit for each query) would be ideal to manage operators that need it. Looking at [1] (from the Hadoop project) gives an indication of how this might be approached.

          [1] http://pig.apache.org/docs/r0.8.1/api/org/apache/pig/data/DataBag.html

          Show
          Stephen Allen added a comment - Some answers with respect to JENA-45 : 1) In JENA-45 , bnodes are encoded with NodeFmtLib.safeBNodeLabel(String) and decoded with a new method NodeFmtLib.decodeSafeBNodeLabel(String). This outputs 'B' followed by the internal blank node label encoded in hexadecimal. This allows us to deserialize back to the proper Jena bnode. 2) I think we need to add a way make ThresholdPolicyCount configurable externally. Alternatively, we can change this to actual memory usage by looking at the contents of each Binding object as it is added. 3) Ultimately having unified memory manager (perhaps with a preset limit for each query) would be ideal to manage operators that need it. Looking at [1] (from the Hadoop project) gives an indication of how this might be approached. [1] http://pig.apache.org/docs/r0.8.1/api/org/apache/pig/data/DataBag.html
          Hide
          Paolo Castagna added a comment - - edited

          @Andy

          1) BindingSerializer uses NodeFmtLib.serialize(Node node) to serialize a Node into a byte[], but it uses a ByteBuffer internally (which needs to be expanded as necessary) and then makes a copy onto a byte[]. To deserialize a byte[] into a Node it uses a Node decode(byte[] bytes) method taken from TDB com.hp.hlp.jena.tdb.nodetable.NodecSSE.

          2) JENA uses a symbol since that is the common way to configure internal parameters within ARQ and it allows to be changed by users. Isn't it?

          3) We could have something adaptive, but I would have this as a further improvement.

          We are currently using a patched version of ARQ with JENA-44 and this has solved some of the problems we were experiencing on our servers.

          Show
          Paolo Castagna added a comment - - edited @Andy 1) BindingSerializer uses NodeFmtLib.serialize(Node node) to serialize a Node into a byte[], but it uses a ByteBuffer internally (which needs to be expanded as necessary) and then makes a copy onto a byte[]. To deserialize a byte[] into a Node it uses a Node decode(byte[] bytes) method taken from TDB com.hp.hlp.jena.tdb.nodetable.NodecSSE. 2) JENA uses a symbol since that is the common way to configure internal parameters within ARQ and it allows to be changed by users. Isn't it? 3) We could have something adaptive, but I would have this as a further improvement. We are currently using a patched version of ARQ with JENA-44 and this has solved some of the problems we were experiencing on our servers.
          Hide
          Paolo Castagna added a comment -

          @Sam and in relation to (de)serializing bindings, see: http://markmail.org/message/zadahd4wwwvxahd2 from jena-users mailing list.

          Show
          Paolo Castagna added a comment - @Sam and in relation to (de)serializing bindings, see: http://markmail.org/message/zadahd4wwwvxahd2 from jena-users mailing list.
          Show
          Paolo Castagna added a comment - - edited New ARQ and TDB SNAPSHOTs with the latest ( i.e. https://issues.apache.org/jira/secure/attachment/12477870/JENA-44_ARQ_r8724.patch ) patch applied are here: http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/arq/2.8.9-JENA_44-SNAPSHOT/ http://oss.talisplatform.com/content/repositories/talis-snapshots/com/hp/hpl/jena/tdb/0.8.11-JENA_44-SNAPSHOT/ If you try them, please, report back any problem.
          Hide
          Paolo Castagna added a comment -

          Updated patch, now using the new Binding I/O (kudos to Andy).

          Show
          Paolo Castagna added a comment - Updated patch, now using the new Binding I/O (kudos to Andy).
          Hide
          Stephen Allen added a comment -

          Attached a patch using the implementation in JENA-99.

          Show
          Stephen Allen added a comment - Attached a patch using the implementation in JENA-99 .
          Hide
          Paolo Castagna added a comment -

          Hi Stephen, thanks. Your patch for JENA-44 is great as well as the work you have done for JENA-99. It made this tiny and trivial which is good.
          However, I am not completely clear on what happens if cancel is called on a QueryIterSort.
          I saw that there is a TODO "Port the tests below to use the new DataBags", however those tests from the tests for ExternalBindingSort which had a flag to be cancelled. Should we have a Canceable interface and should DataBag<T> extends Canceable? Or, this is not necessary and we can just catch QueryCancelledException and call close() as you have done in SortedBindingIterator.
          I've tried to port the tests in TestSortedDataBag to use the new DataBags, but I struggled because there is not notion of cancellation for DataBags.

          Show
          Paolo Castagna added a comment - Hi Stephen, thanks. Your patch for JENA-44 is great as well as the work you have done for JENA-99 . It made this tiny and trivial which is good. However, I am not completely clear on what happens if cancel is called on a QueryIterSort. I saw that there is a TODO "Port the tests below to use the new DataBags", however those tests from the tests for ExternalBindingSort which had a flag to be cancelled. Should we have a Canceable interface and should DataBag<T> extends Canceable? Or, this is not necessary and we can just catch QueryCancelledException and call close() as you have done in SortedBindingIterator. I've tried to port the tests in TestSortedDataBag to use the new DataBags, but I struggled because there is not notion of cancellation for DataBags.
          Hide
          Stephen Allen added a comment -

          I did not include a cancellation mechanism in the DataBags themselves because it was not clear to me that it would be necessary.

          The only point at which a significant amount of time can be spent in the DataBag code is in the add() method right as a spill is occurring. The program execution may be in Array.sort() (SortedDataBag and DistinctDataBag) or it may be in the process of serializing tuples to disk. Given anticipated spill thresholds (1,000-100,000 tuples or memory in the 10-100 MB range), and the fact that disk I/O is sequential (and thus fast), it seemed like an unnecessary complication to support cancellation since those operations would complete in the 10's of seconds range. Any physical query operator using the DataBag would then be able to cancel immediately after the spill finished (QueryIterSort passes the cancel request to it's embedded iterator which will then throw the QueryCancellationException on the next iteration).

          After the add phase is complete, and the QueryIterSort starts returning results, cancellation will be handled by the super class (QueryIteratorBase).

          Porting the tests meant that they would test the QueryIterSort with the embedded DataBag to be sure that the temporary files were cleaned up when the iterator was cancelled. So it's not really testing cancellation on the DataBag per say, but rather the new QueryIterSort.

          Show
          Stephen Allen added a comment - I did not include a cancellation mechanism in the DataBags themselves because it was not clear to me that it would be necessary. The only point at which a significant amount of time can be spent in the DataBag code is in the add() method right as a spill is occurring. The program execution may be in Array.sort() (SortedDataBag and DistinctDataBag) or it may be in the process of serializing tuples to disk. Given anticipated spill thresholds (1,000-100,000 tuples or memory in the 10-100 MB range), and the fact that disk I/O is sequential (and thus fast), it seemed like an unnecessary complication to support cancellation since those operations would complete in the 10's of seconds range. Any physical query operator using the DataBag would then be able to cancel immediately after the spill finished (QueryIterSort passes the cancel request to it's embedded iterator which will then throw the QueryCancellationException on the next iteration). After the add phase is complete, and the QueryIterSort starts returning results, cancellation will be handled by the super class (QueryIteratorBase). Porting the tests meant that they would test the QueryIterSort with the embedded DataBag to be sure that the temporary files were cleaned up when the iterator was cancelled. So it's not really testing cancellation on the DataBag per say, but rather the new QueryIterSort.
          Hide
          Stephen Allen added a comment -

          If we wanted to add support, I don't think it would be too difficult if we wanted to cancel during tuple serialization.

          However, I'm not sure if we can do much during the Array.sort(). Does Thread.interrupt() cancel that method? If so, how would we get a handle to the proper thread/threads (need to be careful about lock contention if registering them somewhere)?

          Show
          Stephen Allen added a comment - If we wanted to add support, I don't think it would be too difficult if we wanted to cancel during tuple serialization. However, I'm not sure if we can do much during the Array.sort(). Does Thread.interrupt() cancel that method? If so, how would we get a handle to the proper thread/threads (need to be careful about lock contention if registering them somewhere)?
          Hide
          Paolo Castagna added a comment -

          QueryIterSort now reads the threshold value from a Symbol in the ExecutionContext (default is Integer.MAX_VALUE == no spilling).

          On requestCancel() we call cancel on the emdeddedIterator, close on the SortedDataBag and super.requestCancel.

          I added a TestQueryIterSort to make sure we delete temporary files on all the possible cancellation scenarios.

          @Andy, @Stephen, what do you think?

          I'd like to commit this and close this issue to move onto JENA-45.

          Show
          Paolo Castagna added a comment - QueryIterSort now reads the threshold value from a Symbol in the ExecutionContext (default is Integer.MAX_VALUE == no spilling). On requestCancel() we call cancel on the emdeddedIterator, close on the SortedDataBag and super.requestCancel. I added a TestQueryIterSort to make sure we delete temporary files on all the possible cancellation scenarios. @Andy, @Stephen, what do you think? I'd like to commit this and close this issue to move onto JENA-45 .
          Hide
          Stephen Allen added a comment -

          I made a few changes:

          1) I'm now using -1 to shut off the spill to disk behavior
          2) Changed to spillOnDiskSortingThreshold to be a Long instead of Integer
          3) Removed the db.close() in the requestCancel()
          4) Added a few more tests

          For 3) the requestCancel() method is potentially called by another thread, and thus cannot call the non-thread safe DataBag.close() method. Instead, we will get proper cancellation from either the embedded iterator or the super class (depending on which phase we are in, adding to or iterating over the databag).

          Show
          Stephen Allen added a comment - I made a few changes: 1) I'm now using -1 to shut off the spill to disk behavior 2) Changed to spillOnDiskSortingThreshold to be a Long instead of Integer 3) Removed the db.close() in the requestCancel() 4) Added a few more tests For 3) the requestCancel() method is potentially called by another thread, and thus cannot call the non-thread safe DataBag.close() method. Instead, we will get proper cancellation from either the embedded iterator or the super class (depending on which phase we are in, adding to or iterating over the databag).
          Hide
          Paolo Castagna added a comment -

          Thanks Stephen, it is always good to have another pair (or more) of eyes to look at my stuff.
          I wasn't sure about 3) but you convinced me it's not necessary. I put it there thinking it was necessary in case .cancel() is called before SortedBindingIterator is initialized, but at that point we cannot have open iterators or open files which need to be closed.

          Pending "thumbs up" from Andy, I am ready to commit this (it has been a long journey, but at the end I think we addressed all the Andy concerns and with your great help we have a good quality improvement to spill stuff on disk for better scalability.

          Show
          Paolo Castagna added a comment - Thanks Stephen, it is always good to have another pair (or more) of eyes to look at my stuff. I wasn't sure about 3) but you convinced me it's not necessary. I put it there thinking it was necessary in case .cancel() is called before SortedBindingIterator is initialized, but at that point we cannot have open iterators or open files which need to be closed. Pending "thumbs up" from Andy, I am ready to commit this (it has been a long journey, but at the end I think we addressed all the Andy concerns and with your great help we have a good quality improvement to spill stuff on disk for better scalability.
          Hide
          Andy Seaborne added a comment -

          Shouldn't Symbol spillOnDiskSortingThreshold should be in ARQ?

          What should go in ChangeLog.txt?

          Show
          Andy Seaborne added a comment - Shouldn't Symbol spillOnDiskSortingThreshold should be in ARQ? What should go in ChangeLog.txt?

            People

            • Assignee:
              Paolo Castagna
              Reporter:
              Sam Tunnicliffe
            • Votes:
              4 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development