Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.4.0
    • Component/s: None
    • Labels:
      None

      Description

      incoming batches are currently queued up and held in memory without limit. If execution on a node is delayed, or is not able keep up with the rate of incoming batches, this could result in out of memory errors. We want to allow spooling incoming batches to disk once the total size of the batches in the queue reaches a threshold.

      1. DRILL-274_2013-11-18_17:12:34.patch
        76 kB
        Steven Phillips
      2. DRILL-274_2013-11-18_17:57:41.patch
        76 kB
        Steven Phillips
      3. DRILL-274.patch
        67 kB
        Steven Phillips
      4. DRILL-274.patch
        67 kB
        Steven Phillips

        Activity

        Hide
        Steven Phillips added a comment -

        I wrote up a quick description of what I'm thinking of doing.

        class SpoolingRawFragmentBatchBuffer implements RawBatchBuffer {
        Queue<RawFragmentBatchWrap> buffer;
        private SpoolingManager spoolingManager;
        private boolean spool;

        public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch); // if !spool, wrap and add batch to queue. if spool, wrap, set available = false, add to spoolingManager
        public RawFragmentBatch getNext(); // get the RFB from the next RFBW on the queue. this will block if the RFB has not been recovered from disk yet.

        class RawFragmentBatchWrap

        { private RawFragmentBatch batch; private boolean available; RawFragmentBatch get(); // returns batch if available. blocks if not available until it available void writeToStream(OutputStream stream); // write RFB to output stream void readFromStream(InputStream stream); // reconstruct RFB from input stream }

        class SpoolingManager

        { Queue<RawFragmentBatchWrap> incoming; Queue<RawFragmentBatchWrap> outgoing; public addBatch(RawFragmentBatch batch); // add batch to incoming queue private void spool(); // starting spooling batches in incoming and moving to outgoing queue private void despool(); // start reconstructing batches in outgoing queue, removing them from queue when done }

        }

        This works similar to the UnlimitedRawBatchBuffer. But the incoming batches will get wrapped and added to the queue, even after we've started spooling. Once we've reached a threshold and we decide to start spooling, the batches will be wrapped and marked unavailable, and handed to the SpoolingManager, which works in a separate thread writing these batches to disk. Once we decide to stop spooling and start reading back, the SpoolingManager will handle this as well. It will close the OutputStream and open an InputStream. I will need to take care to handle the case where we may start spooling, start reading back, and then start spooling again before we have finished reading back. I think I will create a new file each time it starts spooling again. I don't want to have to worry about dealing with things like append, or trying to read from a file that is still open.

        If RawBatchBuffer.getNext() is called and the next RFB is not available, it will block until it is available. Hopefully this won't be a problem because we will start reading back from disk well in advance so the data is available when it is called. But we may not be able to read back fast enough.

        Show
        Steven Phillips added a comment - I wrote up a quick description of what I'm thinking of doing. class SpoolingRawFragmentBatchBuffer implements RawBatchBuffer { Queue<RawFragmentBatchWrap> buffer; private SpoolingManager spoolingManager; private boolean spool; public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch); // if !spool, wrap and add batch to queue. if spool, wrap, set available = false, add to spoolingManager public RawFragmentBatch getNext(); // get the RFB from the next RFBW on the queue. this will block if the RFB has not been recovered from disk yet. class RawFragmentBatchWrap { private RawFragmentBatch batch; private boolean available; RawFragmentBatch get(); // returns batch if available. blocks if not available until it available void writeToStream(OutputStream stream); // write RFB to output stream void readFromStream(InputStream stream); // reconstruct RFB from input stream } class SpoolingManager { Queue<RawFragmentBatchWrap> incoming; Queue<RawFragmentBatchWrap> outgoing; public addBatch(RawFragmentBatch batch); // add batch to incoming queue private void spool(); // starting spooling batches in incoming and moving to outgoing queue private void despool(); // start reconstructing batches in outgoing queue, removing them from queue when done } } This works similar to the UnlimitedRawBatchBuffer. But the incoming batches will get wrapped and added to the queue, even after we've started spooling. Once we've reached a threshold and we decide to start spooling, the batches will be wrapped and marked unavailable, and handed to the SpoolingManager, which works in a separate thread writing these batches to disk. Once we decide to stop spooling and start reading back, the SpoolingManager will handle this as well. It will close the OutputStream and open an InputStream. I will need to take care to handle the case where we may start spooling, start reading back, and then start spooling again before we have finished reading back. I think I will create a new file each time it starts spooling again. I don't want to have to worry about dealing with things like append, or trying to read from a file that is still open. If RawBatchBuffer.getNext() is called and the next RFB is not available, it will block until it is available. Hopefully this won't be a problem because we will start reading back from disk well in advance so the data is available when it is called. But we may not be able to read back fast enough.
        Hide
        Jacques Nadeau added a comment -

        Looks good. a few thoughts:

        • Ways and whether to effectively cascade down a IterOutcome.NOT_YET on the first fail.
        • ConnectionThrottle was irrelevant when we're just packing unlimited in memory. If/when we should actually throttle incoming RPCs through this buffer.
        • I wonder if might be good to have a buffer set. For example, if we have a set of separate incoming buffers and we're going to be merging them, it would be best to balance memory consumption rather than letting the first stream fill up most of the available buffering memory and spooling nearly all of the other streams.
        • You don't note it hear but we've talked previously about providing a HDFS URI for buffering purposes. I wonder whether we should also allow multiple paths instead of only one to do pseudo disk striping like MR does.
        Show
        Jacques Nadeau added a comment - Looks good. a few thoughts: Ways and whether to effectively cascade down a IterOutcome.NOT_YET on the first fail. ConnectionThrottle was irrelevant when we're just packing unlimited in memory. If/when we should actually throttle incoming RPCs through this buffer. I wonder if might be good to have a buffer set. For example, if we have a set of separate incoming buffers and we're going to be merging them, it would be best to balance memory consumption rather than letting the first stream fill up most of the available buffering memory and spooling nearly all of the other streams. You don't note it hear but we've talked previously about providing a HDFS URI for buffering purposes. I wonder whether we should also allow multiple paths instead of only one to do pseudo disk striping like MR does.
        Hide
        Steven Phillips added a comment -

        Created reviewboard

        Show
        Steven Phillips added a comment - Created reviewboard
        Hide
        Steven Phillips added a comment -
        Show
        Steven Phillips added a comment - Created reviewboard https://reviews.apache.org/r/15564/
        Hide
        Steven Phillips added a comment -

        After discussion with Jacques and Ben, I decided to take a slightly different approach. I still have the RawFragmentBatchWrap, but I no longer have a spooling manager.

        Now, if we start spooling, the enqueue() method call does not return until the batch has been written to disk. And if the getNext() method is called and the next batch has been spooled to disk, the batch is retrieved at that point. There are no longer any queues of batch to be spooled or unspooled.

        I created a new FileSystem implementation that allows syncing the data to the local filesystem. This is to avoid having to close the output stream in order to read the data back.

        Show
        Steven Phillips added a comment - After discussion with Jacques and Ben, I decided to take a slightly different approach. I still have the RawFragmentBatchWrap, but I no longer have a spooling manager. Now, if we start spooling, the enqueue() method call does not return until the batch has been written to disk. And if the getNext() method is called and the next batch has been spooled to disk, the batch is retrieved at that point. There are no longer any queues of batch to be spooled or unspooled. I created a new FileSystem implementation that allows syncing the data to the local filesystem. This is to avoid having to close the output stream in order to read the data back.
        Hide
        Steven Phillips added a comment -

        Updated reviewboard

        Show
        Steven Phillips added a comment - Updated reviewboard
        Hide
        Steven Phillips added a comment -
        Show
        Steven Phillips added a comment - Updated reviewboard https://reviews.apache.org/r/15564/
        Hide
        Jacques Nadeau added a comment -

        Fixed in 2c811a8

        Show
        Jacques Nadeau added a comment - Fixed in 2c811a8

          People

          • Assignee:
            Steven Phillips
            Reporter:
            Steven Phillips
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development