Issue Details (XML | Word | Printable)

Key: AMQ-2233
Type: Bug Bug
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Gary Tully
Reporter: Dave Syer
Votes: 1
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

After rollback received messages not re-presented

Created: 28/Apr/09 05:08 AM   Updated: 08/Jul/09 02:03 AM
Return to search
Component/s: None
Affects Version/s: 5.2.0
Fix Version/s: 5.3.0

Time Tracking:
Not Specified

File Attachments:
  Size
Java Source File Licensed for inclusion in ASF works RawRollbackSharedConsumerTests.java 2009-05-13 04:27 AM Dave Syer 3 kB
Java Source File Licensed for inclusion in ASF works RawRollbackTests.java 2009-04-28 10:20 PM Dave Syer 3 kB
Issue Links:
dependent
 

Regression: Regression


 Description  « Hide
After rollback received messages not re-presented. If I receive in a transaction and then roll back the messages should be re-presented in the next transaction. This used to work in 5.1.0, but is broken in 5.2.0.

You can browse the Queue in JMX after the rollback and see that the messages are still there, but they are not received by a consumer in the same process.

Here's a test case (fails on the checkPostConditions()):

public class RawRollbackTests {
	
	private static ConnectionFactory connectionFactory;
	private static Destination queue;
	private static BrokerService broker;

	@BeforeClass
	public static void clean() throws Exception {
		FileUtils.deleteDirectory(new File("activemq-data"));
		broker = new BrokerService();
		broker.setUseJmx(true);
		broker.start();
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		connectionFactory.setBrokerURL("vm://localhost?async=false");
		RawRollbackTests.connectionFactory = connectionFactory;
		queue = new ActiveMQQueue("queue");
	}

	@AfterClass
	public static void close() throws Exception {
		broker.stop();
	}

	@Before
	public void clearData() throws Exception {
		getMessages(false); // drain queue
		convertAndSend("foo");
		convertAndSend("bar");
	}


	@After
	public void checkPostConditions() throws Exception {

		Thread.sleep(1000L);
		List<String> list = getMessages(false);
		assertEquals(2, list.size());

	}

	@Test
	public void testReceiveMessages() throws Exception {

		List<String> list = getMessages(true);
		assertEquals(2, list.size());
		assertTrue(list.contains("foo"));

	}
	
	private void convertAndSend(String msg) throws Exception {
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		MessageProducer producer = session.createProducer(queue);
		producer.send(session.createTextMessage(msg));
		producer.close();
		session.commit();
		session.close();
		connection.close();
	}

	private List<String> getMessages(boolean rollback) throws Exception {
		Connection connection = connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		String next = "";
		List<String> msgs = new ArrayList<String>();
		while (next != null) {
			next = (String) receiveAndConvert(session);
			if (next != null)
				msgs.add(next);
		}
		if (rollback) {
			session.rollback();
		} else {
			session.commit();
		}
		session.close();
		connection.close();
		return msgs;
	}

	private String receiveAndConvert(Session session) throws Exception {
		MessageConsumer consumer = session.createConsumer(queue);
		Message message = consumer.receive(100L);
		consumer.close();
		if (message==null) {
			return null;
		}
		return ((TextMessage)message).getText();
	}
}


 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
No work has yet been logged on this issue.