Description
The STOMP connector is throwing away messages when the STOMP client disconnects.
Here is the test scenario I am using.
1) a publisher that writes 50 messages, 1 per second
2) a subscriber that reads 10 messages, 1 per every 2 seconds
2a) subscriber correctly sends stomp disconnect when done
3) rerun subscriber each time it finishes with 10 messages.
So during the first run of the subscriber, it gets 10 messages as expected.
-bash-2.05b$ ./activemq_tester -p 61613 -d sub -s 2 -m 10
Connecting......OK
Sending connect message.OK
Reading Response.Response: CONNECTED,
OK
Sending Subscribe.OK
Reading Subscribed Messsages.
Received: MESSAGE, This is message number 0, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:1
Received: MESSAGE, This is message number 1, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:2
Received: MESSAGE, This is message number 2, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:3
Received: MESSAGE, This is message number 3, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:4
Received: MESSAGE, This is message number 4, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:5
Received: MESSAGE, This is message number 5, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:6
Received: MESSAGE, This is message number 6, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:7
Received: MESSAGE, This is message number 7, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:8
Received: MESSAGE, This is message number 8, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:9
Received: MESSAGE, This is message number 9, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:10
OK
Sending Disconnect.OK
Disconnecting...OK
During this time since the publisher is about twice as fast, it has made it to message 22 and is still putting in one per second going to its prescribed 50 input messages.
Now I uparrow and run the subscriber again.....
-bash-2.05b$ ./activemq_tester -p 61613 -d sub -s 2 -m 10
Connecting......OK
Sending connect message.OK
Reading Response.Response: CONNECTED,
OK
Sending Subscribe.OK
Reading Subscribed Messsages.
Received: MESSAGE, This is message number 23, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:24
Received: MESSAGE, This is message number 24, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:25
Received: MESSAGE, This is message number 25, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:26
Received: MESSAGE, This is message number 26, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:27
Received: MESSAGE, This is message number 27, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:28
Received: MESSAGE, This is message number 28, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:29
Received: MESSAGE, This is message number 29, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:30
Received: MESSAGE, This is message number 30, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:31
Received: MESSAGE, This is message number 31, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:32
Received: MESSAGE, This is message number 32, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:33
OK
Sending Disconnect.OK
Disconnecting...OK
And as you can see, messages 11..22 did not get read.
and run the client once more since publisher is not done yet.
-bash-2.05b$ ./activemq_tester -p 61613 -d sub -s 2 -m 10
Connecting......OK
Sending connect message.OK
Reading Response.Response: CONNECTED,
OK
Sending Subscribe.OK
Reading Subscribed Messsages.
Received: MESSAGE, This is message number 46, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:47
Received: MESSAGE, This is message number 47, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:48
Received: MESSAGE, This is message number 48, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:49
Received: MESSAGE, This is message number 49, persistence flag [true]
message-id ID:lxdm14608.etrade.com-40939-1182371192387-3:0:-1:1:50
again missing messages 34..46.
But if I check the stats on this Queue, you can see that the broker has delivered all 50 messages.
-bash-2.05b$ bin/activemq query -QQueue=*
ACTIVEMQ_HOME: /home/jschaube/activemq/apache-activemq-4.1-SNAPSHOT
ACTIVEMQ_BASE: /home/jschaube/activemq/apache-activemq-4.1-SNAPSHOT
Type = Queue
DispatchCount = 50
Destination = TEST.FOO
QueueSize = 0
Name = TEST.FOO
DequeueCount = 50
MemoryPercentageUsed = 0
ConsumerCount = 1
MemoryLimit = 9223372036854775807
EnqueueCount = 50
BrokerName = b60000
So what I suspect is that the STOMP connector had prefetched as many as were written, but then when the client sends STOMP "DISCONNECT" command, the connector does not put those messages back but rather just throws them away.
This is not good especially when the messages are flagged persistent and I expect to loose none.
STOMP client is c code, here is the disconnect and tear down of the socket....
fprintf(stdout, "Sending Disconnect.");
fprintf(stdout, "OK\n");
fprintf(stdout, "Disconnecting...");
rc=stomp_disconnect(&connection);
rc==APR_SUCCESS || die(-2, "Could not disconnect", rc);
fprintf(stdout, "OK\n");
Here is part of the config, Notice I attempted to set the prefetch values to "1" to resolve the problem.
But I am unsure of the syntax since it made no difference in the way it behaved.
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616?jms.prefetchPolicy.queu
ePrefetch=1" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613?jms.prefetchPolicy.que
uePrefetch=1"/>
</transportConnectors>
Joel Schaubert