I would like to add the following example description to the web page documentation. The code from
QPID-1595 can be included at the bottom.
Some formatting of headers, lists and source code is required:
Multi-consumer example with Time-To-Live.
There are several client examples in the distribution that focus primarily on the various types of exchanges. However, there are several different consumer patterns that are worth looking at:
- Last Value Queues (LVQ) that allow messages with the latest value to overwrite the previous messages with the same key in a header (think stock values)
- Time-To-Live (TTL) which provide a timeout mechanism for messages to expire
- Ring queues for conserving space and insuring that queues don't grow to unmanagable sizes that might impact performance (this is more of an implementation detail rather than a consumer pattern).
In this example we examine how we might use TTL and some other features to create a the following consumer pattern:
Provide several topic based queues that will be shared among several consumers (subscribers).
Make sure that all consumers can see all messages published to the queue within a certain timeframe.
Provide a timeout on published messages to make sure that 'stale' messages don't remain in the queue and so that late coming subscribers only get 'recent', 'relevent' information.
First we must declare the queues. There are three differences between this example and the pub/sub example:
1. The queues are declared independent of the subscriber (listener) sessions and willl therefore outlive the listeners. If the broker restarts the queues will need to be declared again. (This can be changed by making the queues durable.)
Each queue needs to be declared as follows:
session.exchangeBind(arg::exchange="amq.topic", arg::queue="usa", arg::bindingKey="usa.#");
The important part here is that the queue is non-exclusive. Also note that the name of the queue name does not have any subscriber specific ID and therefore is easily shared.
2. The topic listener program will subscribe to the queue(s).
The important part here is that the subscriber will 'browse' messages and not acquire (ACQUIRE_MODE_NOT_ACQUIRED), i.e. take ownership, of the message. This allows other subscribers to see the message too.
3. The topic publisher program will send messages with a set Time-To-Live (TTL). i.e. the message will time out and be purged after the TTL period. The TTL period is in milliseconds.
message_data << "My Message"; // the example put this in a loop
message.getDeliveryProperties().setTtl(ttl_time); // ttl_time is an int, say 3000
// Asynchronous transfer sends messages as quickly as
// possible without waiting for confirmation.
The important part here is that the setTtl() method is called so that messages can timeout and be removed from the queue.
Run declare_queues to set up the non-exclusive queues.
Run a topic_listener in one shell which will wiat for messages to be posted.
Run the topic_publisher in another shell and give it a ttl paramater of 4000.
Run another topic_listener in another shell.
Look at the output of both topic_listeners. The first one, started before the publisher, will get all the messages. The second listener may or may not get all/any of the messages depending on how late it joined the queue and started listening.
Another enhancement would be to have a delay built into the listner handler which would slow the listener down and therefore it may miss some messages.
Again, in certain scenarios it is okay not to get older messages.
QPID-1595 added here excluding the makefile.