Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Incomplete
    • Affects Version/s: M2.1, M4, 0.5
    • Fix Version/s: None
    • Component/s: Java Broker
    • Labels:
      None

      Description

      Currently, the Java Broker can do one of two things with a message it has to deliver:

      1. Keep transient messages in memory until delivered
      2. Write persistent messages to a message store (like BDB) and keep in memory until delivery complete

      This means that the broker is not able to avoid OoM exceptions i.e. send enough messages to the broker, especially if your consumers are not active, and you could bring the broker done once it explodes its available heap.

      RG to add more details and design proposal here please

        Activity

        Hide
        Arnaud Simon added a comment -

        There are several possible heuristics. Let's assume that a message has an in-memory state and a persistent state so we can:
        1) discard all in-memory states when the memory size reaches a pre-defined threshold.
        2) discard new and/or old transient messages
        3) only create persistent state of persistent and transient messages
        4) throttle publisher speed up to stopping then when memory size is critical

        We can obviously combine for example 1) and 3). Note that even if a message payload is not kept in-memory, accepting a new message requires to create objects hence free memory. So, whatever heuristic we go for we will eventually have to apply 4).

        Show
        Arnaud Simon added a comment - There are several possible heuristics. Let's assume that a message has an in-memory state and a persistent state so we can: 1) discard all in-memory states when the memory size reaches a pre-defined threshold. 2) discard new and/or old transient messages 3) only create persistent state of persistent and transient messages 4) throttle publisher speed up to stopping then when memory size is critical We can obviously combine for example 1) and 3). Note that even if a message payload is not kept in-memory, accepting a new message requires to create objects hence free memory. So, whatever heuristic we go for we will eventually have to apply 4).
        Hide
        Marnie McCormack added a comment -

        Moving items not being worked on afaik out of M4 Fix Version

        Show
        Marnie McCormack added a comment - Moving items not being worked on afaik out of M4 Fix Version
        Hide
        Marnie McCormack added a comment -

        Re Arnaud's comments, I'm not sure that they don't miss the key point of writing to disk i.e. the movement of data out of VM into somewhere else for later recovery. I have possibly misunderstood, but commenting to ensure no confusion.

        Show
        Marnie McCormack added a comment - Re Arnaud's comments, I'm not sure that they don't miss the key point of writing to disk i.e. the movement of data out of VM into somewhere else for later recovery. I have possibly misunderstood, but commenting to ensure no confusion.
        Hide
        Aidan Skinner added a comment -

        There are several changes necessary to implement this. Message payload storage should be seperated from metadata storage. Persistent messages should be written to disk immediately as they are recieved.

        When memory usages reaches a threshold, in-memory persistant message payloads could be GC'd from (ideally the back of) queues until pressure has been relieved. These would be reloaded from disk as they were sent. The existing weak-ref stuff would work here.

        If this dump did not release sufficent memory transient messages would be written to the disk, again ideally from the back of queues and then reloaded.

        Internally, messages would hold a reference to a file in a shallow, wide hierarchy of files which would contain the message payload. These files would be deleted when the messages are sent or expired.

        On startup, after the metadata had been read to restore the state of the broker the filesystem would be scanned and files which were not currently referenced would be deleted. This would clean up any transient messages that had been written to disk before the broker stopped running but not yet deleted.

        Show
        Aidan Skinner added a comment - There are several changes necessary to implement this. Message payload storage should be seperated from metadata storage. Persistent messages should be written to disk immediately as they are recieved. When memory usages reaches a threshold, in-memory persistant message payloads could be GC'd from (ideally the back of) queues until pressure has been relieved. These would be reloaded from disk as they were sent. The existing weak-ref stuff would work here. If this dump did not release sufficent memory transient messages would be written to the disk, again ideally from the back of queues and then reloaded. Internally, messages would hold a reference to a file in a shallow, wide hierarchy of files which would contain the message payload. These files would be deleted when the messages are sent or expired. On startup, after the metadata had been read to restore the state of the broker the filesystem would be scanned and files which were not currently referenced would be deleted. This would clean up any transient messages that had been written to disk before the broker stopped running but not yet deleted.
        Hide
        Rob Godfrey added a comment -

        My thoughts on Flow to disk

        1) The easiest approach to start with is to set capacity limits on queues such that if the total size of messages on a queue exceeds the capacity the messages at the tail of the queue start flowing to disk rather than remaining in memory. To ensure that the broker itself never runs out of memory the total capacity of all queues must not exceed the total capacity of the broker. One can start to get clever here by progressively reducing the capacity of individual queues as the sum of the individual capacities approaches the total capacity.

        2) I strongly believe that we should not confuse the transactional message log (which is used to replay persistent messages after a broker failure) with the random(ish) access store. In particular the current implementation where we try to piggyback flow-to-disk for persistent messages on top of our transaction log is a cause of constant issues for us. It is better (I think) to have a uniform solution across transient and persistent messages for flow-to-disk; and for this to be separated completely from the transaction log.

        3) We should not overcomplicate things for ourselves by trying to be exact in our calculations of memory used. In particular where a message is placed on many queues we should not try to account for the fact that it need be stored only once in memory. Reasoning about memory usage is much easier if we make the simplifying assumption that each message is duplicated on each queue.

        4) Discarding transient messages, killing slow consumers, throttling publishers etc are all separate strategies for dealing with expanding queue size and should be considered separately from flow to disk (they are not alternatives since you may wish to combine all these strategies).

        5) In the first instance we should only consider flowing message header and content to disk, not the structure of the queue itself. This will put an upper bound on the number of messages that can logically be in the broker at any one time - and therefore without implementing other strategies (such as producer throttling) it will still theoretically be possible to make the broker run out of memory. Flowing the queue structure to disk should be part of a second phase of implementation.

        [What do I mean by this? That in the first instance we should keep in memory the list of entries, and the entries may point to an in-memory data structure, or an on-disk data structure. In the second phase the list itself may be partly stored on disk.]

        6) The method of storing message data on disk does not need to be complicated. Data that has been flowed to disk can be deleted on startup (since the transaction log is what preserves durability). Something as simple as a directory per queue, with a file for each message (or separate files for the header and each body chunk) will be sufficient. The file names can be derived from the message id, directory names from the queue name.

        So implementation-wise I would suggest the following:

        1) Remove shared state from the Message class, and move everything into QueueEntries (this allows for a message to be flowed to disk on one queue while staying in memory on another).
        2) Break apart the interface for the store such that it covers only the logging of the durable transactional events (message data arriving, enqueues, dequeues).
        3) Move reference counting into the store/store facade

          • At this point we will have removed our current (fragile) flow-to-disk capabilities on persistent messages... and all messages will be held in memory while live **
            4) Add a capacity property to queues
            5) Update queue entries / message handles so that a queue entry can be explicitly flowed-to-disk, have "flowed-to-disk" as a boolean property, and add the ability to restore a queue entry from disk
            6) Add capabilities to queues to shrink their in memory profile by flowing queue-entries to disk (from the tail upward) until they are under a given notional memory usage

        Then you can start to examine ways of making the broker more dynamically actively manage the memory usage of queues

        hope this helps,
        Rob

        Show
        Rob Godfrey added a comment - My thoughts on Flow to disk 1) The easiest approach to start with is to set capacity limits on queues such that if the total size of messages on a queue exceeds the capacity the messages at the tail of the queue start flowing to disk rather than remaining in memory. To ensure that the broker itself never runs out of memory the total capacity of all queues must not exceed the total capacity of the broker. One can start to get clever here by progressively reducing the capacity of individual queues as the sum of the individual capacities approaches the total capacity. 2) I strongly believe that we should not confuse the transactional message log (which is used to replay persistent messages after a broker failure) with the random(ish) access store. In particular the current implementation where we try to piggyback flow-to-disk for persistent messages on top of our transaction log is a cause of constant issues for us. It is better (I think) to have a uniform solution across transient and persistent messages for flow-to-disk; and for this to be separated completely from the transaction log. 3) We should not overcomplicate things for ourselves by trying to be exact in our calculations of memory used. In particular where a message is placed on many queues we should not try to account for the fact that it need be stored only once in memory. Reasoning about memory usage is much easier if we make the simplifying assumption that each message is duplicated on each queue. 4) Discarding transient messages, killing slow consumers, throttling publishers etc are all separate strategies for dealing with expanding queue size and should be considered separately from flow to disk (they are not alternatives since you may wish to combine all these strategies). 5) In the first instance we should only consider flowing message header and content to disk, not the structure of the queue itself. This will put an upper bound on the number of messages that can logically be in the broker at any one time - and therefore without implementing other strategies (such as producer throttling) it will still theoretically be possible to make the broker run out of memory. Flowing the queue structure to disk should be part of a second phase of implementation. [What do I mean by this? That in the first instance we should keep in memory the list of entries, and the entries may point to an in-memory data structure, or an on-disk data structure. In the second phase the list itself may be partly stored on disk.] 6) The method of storing message data on disk does not need to be complicated. Data that has been flowed to disk can be deleted on startup (since the transaction log is what preserves durability). Something as simple as a directory per queue, with a file for each message (or separate files for the header and each body chunk) will be sufficient. The file names can be derived from the message id, directory names from the queue name. So implementation-wise I would suggest the following: 1) Remove shared state from the Message class, and move everything into QueueEntries (this allows for a message to be flowed to disk on one queue while staying in memory on another). 2) Break apart the interface for the store such that it covers only the logging of the durable transactional events (message data arriving, enqueues, dequeues). 3) Move reference counting into the store/store facade At this point we will have removed our current (fragile) flow-to-disk capabilities on persistent messages... and all messages will be held in memory while live ** 4) Add a capacity property to queues 5) Update queue entries / message handles so that a queue entry can be explicitly flowed-to-disk, have "flowed-to-disk" as a boolean property, and add the ability to restore a queue entry from disk 6) Add capabilities to queues to shrink their in memory profile by flowing queue-entries to disk (from the tail upward) until they are under a given notional memory usage Then you can start to examine ways of making the broker more dynamically actively manage the memory usage of queues hope this helps, Rob
        Hide
        Carl Trieloff added a comment -

        would be nice to use the same config on queues as we do on C++ side if that works for the objectives here

        seems as if it would. keep the config the same.

        Show
        Carl Trieloff added a comment - would be nice to use the same config on queues as we do on C++ side if that works for the objectives here seems as if it would. keep the config the same.
        Hide
        Martin Ritchie added a comment -

        Hi Rob, can you provide feedback on the full FtD solution please.
        Cheers

        Show
        Martin Ritchie added a comment - Hi Rob, can you provide feedback on the full FtD solution please. Cheers
        Hide
        Martin Ritchie added a comment -
        Show
        Martin Ritchie added a comment - This is where the design for FtD lives: http://cwiki.apache.org/confluence/display/qpid/Java+Broker+Design+-+Flow+to+Disk
        Hide
        Martin Ritchie added a comment -

        Remove old FtD related JIRAs. Make as Incomplete. A new JIRA will be created for the new design and implementation.

        Show
        Martin Ritchie added a comment - Remove old FtD related JIRAs. Make as Incomplete. A new JIRA will be created for the new design and implementation.

          People

          • Assignee:
            Rob Godfrey
            Reporter:
            Marnie McCormack
          • Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Time Tracking

              Estimated:
              Original Estimate - 83h
              83h
              Remaining:
              Remaining Estimate - 83h
              83h
              Logged:
              Time Spent - Not Specified
              Not Specified

                Development