== The Problem ==
We want to be able to handle a large, growing queue without exhausting
the limited supply of memory. To do this we want to make use of the
== Overview of Approach ==
My design proposal allows a queue to be configured as enabling
'paging'. This option cannot be used in combination with LVQ or
priority queue options at present (due to the specific ordering
requirements of those options).
A queue for which paging is enabled will be backed by a file. This
file will be logically split into fixed size 'pages'. Each page will
hold a contiguous sequence of messages within it. The corresponding
segment of the file for each page may be mapped into memory allowing
representations of the contained messages to be stored to-, and
recovered from-, disk. The recording of message representation to disk
for paging is entirely orthogonal to any persistence of the data for
recovery purposes. This allows the encoded form to be much simpler
since we don't need to consider recovery after broker failure.
The queue is thus comprised of a sequence of pages. Only a fixed
number of pages need be loaded at any given time. This frees the
broker from having to store all the messages in memory. When a message
from an unloaded page is required, that page can be reloaded. This may
necessitate unloading some other page to stay within the allowed
number of loaded pages.
New pages can be created as needed, extending the file without
explicit limit (obviously the filesystem has some finite limit). The
sequence of pages that make up the queue need not match the sequence
of segments within the backing file. This means that pages can be
reused when they are emptied of all messages.
== The Design ==
A specific Messages implementation is used to implement a queue
supporting paging in the manner desctibed above.
On a posix system it relies on mmap/munmap/msync for the mapping of
the file into memory. This will (eventually) be abstracted behind an
abstraction allowing platforms that don't support those calls to
supply alternative implementations.
The central structure in the paged queue is a map of Page instances,
keyed by a sequence number. The key represents the sequence of the
first message contained by the page.
All pages are the same size. Each corresponds to a particular offset
in the file. A Page instance can be in the loaded or unloaded
state. When loaded, the messages it contains are held in a standard
deque from which they can be returned as needed. When loaded, the
segment in the file it is backed by will be mapped into a particular
region in memory.
To add messages to a page it must be loaded. When a messages is added,
it is pushed onto the dequeue and also encoded into the region of
memory to which the file segment it represents is mapped.
A page also contains two sequence sets. One tracks all the messages
that are enqueued within the page, the other all the messages which
have been acquired (the latter is a strict subset of the
former). These sequence sets are always in memory. This means each
enqueued message will be tracked in memory and thus the memory will
grow as the queue grows. However the maximum memory required per
message in the unloaded state is two sequence ranges (assuming both
the enqueued set and acquired set are sparse and the message is
recorded in both). In general it is anticipated the memory used will
be even less than this. Of course additionally there is the memory
overhead of the map of pages which will grow as the queue grows even
though not all these pages are in the loaded state. Of course the
expectation is that the saving in memory by having most of the pages
in a large queue in the unloaded state, in which they do not hold the
actual messages, but merely the two sequence sets mentioned above, is
Having the acquired state held in sequence sets avoids having to
update the file every time a messages state changes. The state of the
message instances can be set based on the sequence sets when the page
is loaded. The sequence sets are also currently updated based on the
message states when the page is unloaded (this is because at present
it is the MessageDistributor that sets the state to acquired, and that
is not done via the Messages instance - that maybe worth changing).
When a subscriber moves through a queue (Messages::next()) the
QueueCursor tracks its poisition. In a paged queue, we can find page
the next message is in by consulting the map of pages. A message at a
given sequence will be in the last page with a key lower or equal to
that sequence number. That page can then be loaded if necessary, and
the message instance within the deque found and returned. The location
of messages for releasing, deleting etc can be done in a similar
== Limitations and Remaining Work ==
This prototype does not handle the case where a message is large than
a page. The page size is currently that reprted by the system (it
needs to be a multiple of this for mmap, but at present the
multiplying factor is always 1).
The selection of a currently loaded page to be 'swapped out' to allow
another page to be loaded is currently very crude and unlikely to be
optimal in many cases. Some refinement of this would be necessary,
likely based on hints in terms of weightings to pages based on past
use (which is a reasonable indicator of likelihood of future use).
The Messages::foreach() method is not implemented. This is currently
I think only used by HA on syncing a backup and I actually think it
could be removed and replaced with a normal cursor based iteration
through the queue (which would also allow the 'visibility' of messages
to be configured (i.e. whether you see acquired messages or not).
Also required are a suite of tests to fully exercise the queue and to
explore the memroy and performance characteristics in different
scenarios in order to determine its usefulness and indicate what sorts
of enhancements might be needed.