Issue Details (XML | Word | Printable)

Key: AMQ-731
Type: Bug Bug
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: James Strachan
Reporter: Christopher G. Stach II
Votes: 0
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

Redeliveries don't work with resource adapter and Jencks

Created: 30/May/06 07:20 PM   Updated: 09/Jun/06 06:45 AM
Return to search
Component/s: Connector
Affects Version/s: 4.0
Fix Version/s: 4.0.1, 4.1.0

Time Tracking:
Not Specified

File Attachments:
  Size
Text File Licensed for inclusion in ASF works amq-txcontext.patch 2006-06-05 08:27 PM Christopher G. Stach II 0.9 kB
Environment: Sun JDK 1.5.0_06, Jencks 1.1.3, AMQ 4.0, Resin Pro 3.0.14


 Description  « Hide
During a rollback in a JTA transaction, this exception is generated a few times:

org.apache.activemq.broker.AbstractConnection.serviceException Async
error occurred: javax.jms.JMSException: Could not correlate
acknowledgment with dispatched message: MessageAck {commandId = 137, responseRequired = false, ackType = 1, consumerId = ID:xxx-2276-1148335783189-2:5:-1:2, firstMessageId = ID:xxx-2276-1148335783189-2:1:1:1:2, lastMessageId = ID:xxx-2276-1148335783189-2:1:1:1:2, destination = queue://xxxQueue, transactionId = null, messageCount = 1}
javax.jms.JMSException: Could not correlate acknowledgment with
dispatched message: MessageAck {commandId = 137, responseRequired = false, ackType = 1, consumerId = ID:xxx-2276-1148335783189-2:5:-1:2, firstMessageId = ID:xxx-2276-1148335783189-2:1:1:1:2, lastMessageId = ID:xxx-2276-1148335783189-2:1:1:1:2, destination = queue://xxxQueue, transactionId = null, messageCount = 1}
at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:175)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:233)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:362)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:176)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:65)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:65)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:78)
at
org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:356)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:178)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:201)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1111)
at
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1655)
at
org.apache.activemq.ActiveMQSession$2.afterRollback(ActiveMQSession.java:720)
at
org.apache.activemq.TransactionContext.afterRollback(TransactionContext.java:134)
at
org.apache.activemq.TransactionContext.rollback(TransactionContext.java:409)
at
org.apache.activemq.ra.LocalAndXATransaction.rollback(LocalAndXATransaction.java:126)
at
org.apache.geronimo.transaction.manager.WrapperNamedXAResource.rollback(WrapperNamedXAResource.java:78)
at
com.caucho.transaction.TransactionImpl.rollbackInt(TransactionImpl.java:787)
at
com.caucho.transaction.TransactionImpl.commit(TransactionImpl.java:560)
at org.jencks.XAEndpoint.afterDelivery(XAEndpoint.java:103)
at
org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:125)
at
org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:64)
at
org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:214)
at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:751)
at
org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:163)
at com.caucho.jca.WorkThread.run(WorkThread.java:99)
at com.caucho.util.ThreadPool.runTasks(ThreadPool.java:490)
at com.caucho.util.ThreadPool.run(ThreadPool.java:423)
at java.lang.Thread.run(Thread.java:595)

Note that the ackType is a poison ack. This shouldn't be the case as the configuration (below) on the resource adapter and the managed connection factory both specify 9 redeliveries. Redelivery does not happen.

Broker configuration:

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://activemq.org/config/1.0">

<broker persistent="false" useJmx="false">
<transportConnectors>
<transportConnector uri="vm://localhost" />
</transportConnectors>

<persistenceAdapter>
<memoryPersistenceAdapter />
</persistenceAdapter>
</broker>

</beans>

Spring configuration:

<bean id="jms.connectionFactory"
class="org.springframework.jca.support.LocalConnectionFactoryBean">
<property name="connectionManager">
<bean class="com.xxx.jca.ResinConnectionManagerFactoryBean">
<property name="localTransaction">
<value>false</value>
</property>

<property name="localTransactionOptimization">
<value>false</value>
</property>

<property name="managedConnectionFactory">
<ref local="jms.managedConnectionFactory" />
</property>

<property name="xaTransaction">
<value>true</value>
</property>
</bean>
</property>

<property name="managedConnectionFactory">
<ref local="jms.managedConnectionFactory" />
</property>
</bean>

<bean id="jms.jcaContainer" class="org.jencks.JCAContainer">
<property name="bootstrapContext">
<bean class="com.xxx.jca.ResinBootstrapContextFactoryBean" />
</property>

<property name="resourceAdapter">
<ref local="jms.resourceAdapter" />
</property>
</bean>

<bean id="jms.managedConnectionFactory"
class="org.apache.activemq.ra.ActiveMQManagedConnectionFactory">
<property name="allPrefetchValues">
<value>1</value>
</property>

<property name="initialRedeliveryDelay">
<value>2000</value>
</property>

<property name="maximumRedeliveries">
<value>9</value>
</property>

<property name="redeliveryBackOffMultiplier">
<value>2</value>
</property>

<property name="redeliveryUseExponentialBackOff">
<value>true</value>
</property>

<property name="resourceAdapter">
<ref local="jms.resourceAdapter" />
</property>
</bean>

<bean id="jms.broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config">
<value>classpath:activemq.xml</value>
</property>

<property name="start">
<value>true</value>
</property>
</bean>

<bean id="jms.resourceAdapter"
class="org.apache.activemq.ra.ActiveMQResourceAdapter"
depends-on="jms.broker">
<property name="allPrefetchValues">
<value>1</value>
</property>

<property name="initialRedeliveryDelay">
<value>2000</value>
</property>

<property name="maximumRedeliveries">
<value>9</value>
</property>

<property name="redeliveryBackOffMultiplier">
<value>2</value>
</property>

<property name="redeliveryUseExponentialBackOff">
<value>true</value>
</property>

<property name="serverUrl">
<value>vm://localhost</value>
</property>
</bean>

I have written a test case using Geronimo's TM, but it succeeds. The configuration is very different, so I don't think it can be attributed to just a simple change in TM. A lot of other configuration changes had to be made.



 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
Christopher G. Stach II added a comment - 02/Jun/06 06:32 PM
Okay, here's a bit more information. While stepping through a rollback, I've found l.elementData in TransactionContext.rollback(Xid) to contain 24 entries for the same TransactionContext. The loop at line 407 iterates over all of these, calling afterRollback on each. afterRollback calls Synchronization.afterRollback at ActiveMQSession line 706 where redelivery is evaluated. Since max redeliveries is set at 9 and there are 24 entries, the redeliveries are exhausted before this loop completes.

So, the question is, why 24 entries?


Christopher G. Stach II added a comment - 02/Jun/06 06:47 PM
Oh, I should add, that once the redeliveries are exhausted, it ends up getting a ConnectionClosedException and jumping down to TransactionContext line 414.

Christopher G. Stach II added a comment - 02/Jun/06 07:36 PM
Okay, this is the stack trace with a breakpoint put at TransactionContext line 560. It happens over and over and over, each time the transaction is suspended.

Thread [resin-30] (Suspended (breakpoint at line 560 in TransactionContext))
TransactionContext.setXid(Xid) line: 560
TransactionContext.end(Xid, int) line: 327
LocalAndXATransaction.end(Xid, int) line: 89
WrapperNamedXAResource.end(Xid, int) line: 51
TransactionImpl.suspend() line: 448
TransactionManagerImpl.suspend() line: 201
TreeCache.suspend() line: 101
TreeCache.put(Object, Object) line: 71
UpdateTimestampsCache.preinvalidate(Serializable[]) line: 54
ActionQueue.execute(Executable) line: 244
DefaultSaveOrUpdateEventListener(AbstractSaveEventListener).performSaveOrReplicate(Object, EntityKey, EntityPersister, boolean, Object, EventSource, boolean) line: 290
DefaultSaveOrUpdateEventListener(AbstractSaveEventListener).performSave(Object, Serializable, EntityPersister, boolean, Object, EventSource, boolean) line: 180
DefaultSaveOrUpdateEventListener(AbstractSaveEventListener).saveWithGeneratedId(Object, String, Object, EventSource, boolean) line: 108
DefaultSaveOrUpdateEventListener.saveWithGeneratedOrRequestedId(SaveOrUpdateEvent) line: 186
DefaultSaveOrUpdateEventListener.entityIsTransient(SaveOrUpdateEvent) line: 175
DefaultSaveOrUpdateEventListener.performSaveOrUpdate(SaveOrUpdateEvent) line: 98
DefaultSaveOrUpdateEventListener.onSaveOrUpdate(SaveOrUpdateEvent) line: 70
SessionImpl.fireSaveOrUpdate(SaveOrUpdateEvent) line: 509
SessionImpl.saveOrUpdate(String, Object) line: 501
SessionImpl.saveOrUpdate(Object) line: 497
HibernateTemplate$18.doInHibernate(Session) line: 693
HibernateTemplate.execute(HibernateCallback, boolean) line: 366
HibernateTemplate.saveOrUpdate(Object) line: 690
LeadDAOImpl(AbstractDAOImpl<E>).saveOrUpdate(E) line: 281
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
Method.invoke(Object, Object...) line: 585
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 287
ReflectiveMethodInvocation.invokeJoinpoint() line: 181
ReflectiveMethodInvocation.proceed() line: 148
TransactionInterceptor.invoke(MethodInvocation) line: 96
ReflectiveMethodInvocation.proceed() line: 170
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 176
$Proxy46.saveOrUpdate(Object) line: not available
RefinanceLeadImpl(LeadImpl).save() line: 545
ProcessorImpl.processLead(DistributionContext, Lead) line: 126
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
Method.invoke(Object, Object...) line: 585
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 287
ReflectiveMethodInvocation.invokeJoinpoint() line: 181
ReflectiveMethodInvocation.proceed() line: 148
TransactionInterceptor.invoke(MethodInvocation) line: 96
ReflectiveMethodInvocation.proceed() line: 170
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 176
$Proxy74.processLead(DistributionContext, Lead) line: not available
ProcessorFacadeImpl$1.run() line: 92
ProcessorFacadeImpl(DistributionContextWrapper).wrap(DistributionContextWrapper$DistributionContextRunnable) line: 124
ProcessorFacadeImpl.processLead(Lead) line: 89
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
Method.invoke(Object, Object...) line: 585
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 287
ReflectiveMethodInvocation.invokeJoinpoint() line: 181
ReflectiveMethodInvocation.proceed() line: 148
TransactionInterceptor.invoke(MethodInvocation) line: 96
ReflectiveMethodInvocation.proceed() line: 170
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 176
$Proxy75.processLead(Lead) line: not available
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
Method.invoke(Object, Object...) line: 585
AopUtils.invokeJoinpointUsingReflection(Object, Method, Object[]) line: 287
ReflectiveMethodInvocation.invokeJoinpoint() line: 181
ReflectiveMethodInvocation.proceed() line: 148
RemoteInvocationTraceInterceptor.invoke(MethodInvocation) line: 68
ReflectiveMethodInvocation.proceed() line: 170
JdkDynamicAopProxy.invoke(Object, Method, Object[]) line: 176
$Proxy0.processLead(Lead) line: not available
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
Method.invoke(Object, Object...) line: 585
LingoInvocation(RemoteInvocation).invoke(Object) line: 179
DefaultRemoteInvocationExecutor.invoke(RemoteInvocation, Object) line: 33
JmsServiceExporterMessageListener(RemoteInvocationBasedExporter).invoke(RemoteInvocation, Object) line: 76
JmsServiceExporterMessageListener(RemoteInvocationBasedExporter).invokeAndCreateResult(RemoteInvocation, Object) line: 112
JmsServiceExporterMessageListener(JmsServiceExporterSupport).onMessage(Message) line: 85
XAEndpoint.onMessage(Message) line: 126
MessageEndpointProxy$MessageEndpointAlive.onMessage(MessageEndpointProxy, Message) line: 120
MessageEndpointProxy.onMessage(Message) line: 60
ActiveMQSession.run() line: 692
ServerSessionImpl.run() line: 163
WorkThread.run() line: 99
ThreadPool.runTasks() line: 490
ThreadPool.run() line: 423
Thread.run() line: 595


Christopher G. Stach II added a comment - 02/Jun/06 07:38 PM
So, with that being said, should endedXATransactionContexts actually be a Map of LinkedHashSets?

Christopher G. Stach II added a comment - 05/Jun/06 04:47 PM
Comparing Resin's and Geronmio's TransactionManagerImpl suspend methods, Geronimo's doesn't even suspend the transaction. The test case I have could probably never work, and it's no wonder that none of the AMQ developers have come across this if testing is only done with Geronimo.

Christopher G. Stach II added a comment - 05/Jun/06 05:12 PM
Resin's TransactionImpl.suspend() method, which doesn't exist in Geronimo, calls XAResource.end(Xid, XAResource.TMSUSPEND), WrapperNamedXAResource.end(Xid, int) line: 51 in the above stack trace.

David Jencks added a comment - 05/Jun/06 05:36 PM
Resin's tx implementation is wrong. suspend is only supposed to remove the thread/tx association, not change what resources are enrolled in the tx.

Christopher G. Stach II added a comment - 05/Jun/06 06:19 PM
How would a single connection be used for multiple transaction branches if end is never called with TMSUSPEND?

David Jencks added a comment - 05/Jun/06 07:42 PM
I assume you mean mutliple jta transactions rather than multiple branches of a single jta transaction. In this case something else such as the connector framework is responsible for delisting the connection(s) from the first tx and enlisting them in the 2nd tx. IIRC this is stated rather unclearly in the jta spec and slightly more clearly but in terms of c programming in the xa spec.

In geronimo we normally don't do this. The only time you can change transactions with suspend/resume is going into an ejb RequiresNew method and in this case we just give you new connections. There is an experimental way of delisting connections and reusing them in the new transaction, but AFAIK no one has ever used it. What are the circumstances that lead Resin to suspend/resume?


Christopher G. Stach II added a comment - 05/Jun/06 07:51 PM
It could be multiple transactions or multiple branches. For example, a single database connection can support X number of concurrent transactions as well as Y number of branches within each transaction. I assume that since Resin is the application server, the transaction manager, and the JCA container, it is calling end on the enlisted XAResources.

Check out the stack trace above. JBoss TreeCache is calling suspend on the TM to do some work outside of the transaction (TreeCache.suspend() line: 101). We use Hibernate's connection after_statement release mode, which is appropriate for connections using JTA.


Christopher G. Stach II added a comment - 05/Jun/06 08:27 PM
This simple patch fixes the whole problem.

James Strachan added a comment - 09/Jun/06 06:45 AM
Patch applied with thanks!