Issue Details (XML | Word | Printable)

Key: AMQ-2233
Type: Bug Bug
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Gary Tully
Reporter: Dave Syer
Votes: 1
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

After rollback received messages not re-presented

Created: 28/Apr/09 05:08 AM   Updated: 08/Jul/09 02:03 AM
Return to search
Component/s: None
Affects Version/s: 5.2.0
Fix Version/s: 5.3.0

Time Tracking:
Not Specified

File Attachments:
  Size
Java Source File Licensed for inclusion in ASF works RawRollbackSharedConsumerTests.java 2009-05-13 04:27 AM Dave Syer 3 kB
Java Source File Licensed for inclusion in ASF works RawRollbackTests.java 2009-04-28 10:20 PM Dave Syer 3 kB
Issue Links:
dependent
 

Regression: Regression


 Description  « Hide
After rollback received messages not re-presented. If I receive in a transaction and then roll back the messages should be re-presented in the next transaction. This used to work in 5.1.0, but is broken in 5.2.0.

You can browse the Queue in JMX after the rollback and see that the messages are still there, but they are not received by a consumer in the same process.

Here's a test case (fails on the checkPostConditions()):

public class RawRollbackTests {
	
	private static ConnectionFactory connectionFactory;
	private static Destination queue;
	private static BrokerService broker;

	@BeforeClass
	public static void clean() throws Exception {
		FileUtils.deleteDirectory(new File("activemq-data"));
		broker = new BrokerService();
		broker.setUseJmx(true);
		broker.start();
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		connectionFactory.setBrokerURL("vm://localhost?async=false");
		RawRollbackTests.connectionFactory = connectionFactory;
		queue = new ActiveMQQueue("queue");
	}

	@AfterClass
	public static void close() throws Exception {
		broker.stop();
	}

	@Before
	public void clearData() throws Exception {
		getMessages(false); // drain queue
		convertAndSend("foo");
		convertAndSend("bar");
	}


	@After
	public void checkPostConditions() throws Exception {

		Thread.sleep(1000L);
		List<String> list = getMessages(false);
		assertEquals(2, list.size());

	}

	@Test
	public void testReceiveMessages() throws Exception {

		List<String> list = getMessages(true);
		assertEquals(2, list.size());
		assertTrue(list.contains("foo"));

	}
	
	private void convertAndSend(String msg) throws Exception {
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		MessageProducer producer = session.createProducer(queue);
		producer.send(session.createTextMessage(msg));
		producer.close();
		session.commit();
		session.close();
		connection.close();
	}

	private List<String> getMessages(boolean rollback) throws Exception {
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		String next = "";
		List<String> msgs = new ArrayList<String>();
		while (next != null) {
			next = (String) receiveAndConvert(session);
			if (next != null)
				msgs.add(next);
		}
		if (rollback) {
			session.rollback();
		} else {
			session.commit();
		}
		session.close();
		connection.close();
		return msgs;
	}

	private String receiveAndConvert(Session session) throws Exception {
		MessageConsumer consumer = session.createConsumer(queue);
		Message message = consumer.receive(100L);
		consumer.close();
		if (message==null) {
			return null;
		}
		return ((TextMessage)message).getText();
	}
}


 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
Gary Tully added a comment - 28/Apr/09 09:31 AM - edited
can you attach your test case (see the attach file link) and make a license grant so that your test can be added to the activemq suite?

This is an interesting test case. The change of behavior is around consumer.close() and transactions. When in a transaction, the close is deferred till the transaction completes, so with a prefetch > 0, messages dispatched to that consumer will not be available till the transaction commits.

I think your test case will work with a prefetch of 0, connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0");


Gary Tully added a comment - 28/Apr/09 09:46 AM
for a managed case, the XA case was made the general case. a close is deferred if in any transaction.

Dave Syer added a comment - 28/Apr/09 10:20 PM
Attached test case as file.

Dave Syer added a comment - 02/May/09 12:44 AM
Thanks for the response. The prefetch hint didn't have any effect on the test case. I didn't follow the comment about the deferred close.

Gary Tully added a comment - 12/May/09 04:36 AM
Added your test case with the prefetch workaround: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java?view=markup

The issue is the consumer.close call in the receiveAndConvert. The close occurs in a transaction and the close is deferred till the transaction completes. The default prefetch value of 100 ensures that the consumer will have both messages queued up for dispatch. The undispatched are only redispatched afer the close. But we don't know if the messages are to be acked till the transaction completes. Thus the additional prefetched "bar" message is not available to a second consumer.
Changing the prefetch value to 0 (note the changed connectionURL) ensures that each consumer gets a single message from a call to receive so only that message is outstanding until the transaction completes.


Dave Syer added a comment - 12/May/09 05:37 AM
Thanks for the update. I tried (as previously indicated) the prefetch hint and it didn't change anything. If I understand the comment correctly maybe you misunderstood the test case: it doesn't fail because "bar" is unavailable to a second consumer - in fact it is, as the assertion assertEquals(2, list.size()) in the test method shows.

It fails because neither "foo" nor "bar" is available to any consumer whatsoever, after the rollback has occurred. The transaction has certainly completed by the time the test fails (thecomment seems to imply the opposite, but maybe I misunderstood).

It still fails for you, right? (I only ran it against 5.2.0.)


Gary Tully added a comment - 12/May/09 08:42 AM
so the test (your test) I committed to trunk works. It uses &jms.prefetchPolicy.all=0

With out the prefetch=0 config, the assertion fails for me with list.size() == 1
In the rollback case, receiveAndConvert is called twice so two consumers are created. The first consumer gets both messages dispatched to it and consumes one before closing. The second consumer gets nothing. So list.size == 1.
The problem is the consumer.close is deferred till the transaction completes (commits or rollsback)

The workaround is to only deliver messages on demand, when receive is called, prefetch=0 gives this.


Dave Syer added a comment - 12/May/09 05:42 PM
Your failure is not the same as mine (in 5.2.0). I am failing in the @After, and it sounds like you are failing (before the fetch size hint) in @Test.

I will try it when a new snapshot appears. The Apache snapshots repo has a latest version in mid April; any news on that, or am I looking in the wrong place?

The original test (with or without the fech size) still fails with 5.2.0 and passes with 5.1.0. It also fails if I use a single consumer to consume all messages in a Session. So I'd be interested to know what broke in the meantime there. 5.1.0 seems correct as far as the JMS spec goes.


Gary Tully added a comment - 13/May/09 01:22 AM
The nightly snapshot is available at https://repository.apache.org/content/repositories/snapshots/org/apache/activemq/apache-activemq/5.3-SNAPSHOT/

I think the key change (on trunk) relates to consumer.close when there is a transaction. The close is deferred to a synchronisation afterCompletion so the effect of the close is not visible till the transaction completes. From what i can see, the test depends on the visibility of predispatched messages before the transaction completes.

Can you provide your test variant that uses a single consumer so I can include it?
W.r.t the spec, the whole prefetch issue is outside the spec, prefetch=0 gets it back in line.


Dave Syer added a comment - 13/May/09 04:27 AM
Added shared consumer test case. This passes in 5.3-SNAPSHOT even without the fetch size hint as predicted. But it also fails in 5.2.0, which still seems like a bug to me, considering what you said above. What do you think?

Gary Tully added a comment - 15/May/09 02:30 AM
yea, I think that is a bug with 5.2.0. All is good on trunk now though. I committed that shared test to protect against regression, thanks. Are you happy to see this issue closed?

Dave Syer added a comment - 19/May/09 03:01 AM
Not abundantly happy, really, no. I think it's a regression from 5.1.0, and hiding behind the strict letter of the law regarding the specification is a weak position to take. Even if prefetch is a proprietary feature, having to set it to a non-default value to get a system to behave as expected makes it very hard for users to guess or understand what is happening. Also, prefetch is a good feature that I probably want to use, so switching it off just to get messages redelivered to another consumer in the same process seems like it must be a workaround for a bug.

Dave Syer added a comment - 15/Jun/09 10:12 AM
I am seeing this issue in production environments now - unacked messages sit on the broker forever until it is restarted. Is there some way I can help to fix it so the behaviour goes back to the way it was in 5.1?

Dave Syer added a comment - 08/Jul/09 12:36 AM
This issue is now apparently fixed in 5.3-SNAPSHOT (my original test case passes), presumably as a result of fixes for other issues?

Gary Tully added a comment - 08/Jul/09 02:03 AM
resolving as per dave's comment, current snapshot has a fix.