Description
Discovered while authoring a custom SubscriptionRecoveryPolicy.
When function 'recover' is being run, no new connections to the broker can be made. All connection attempts hang until 'recover' returns.
This is a huge problem when recovery takes any amount of time. In my particular case, recovery can take many minutes.
Note that this problem also prevents the addition/removal of consumers and producers in general, so any threads needing to do so are also blocked.
The issue appears to be a lock on RegionBroker.purgeInactiveDestinationsTask. I would attempt to fix, but I am unsure of the consequences of messing with this lock and I am unclear on what is being protected here.
Thread doing work in "recover":
"ActiveMQ Transport: ssl:///10.1.210.140:45407" - Thread t@48
at MyCustomSubscriptionRecoveryPolicy.recover(MyCustomSubscriptionRecoveryPolicy.java:xxx)
at org.apache.activemq.broker.region.Topic.addSubscription(Topic.java:133)
at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:290)
- locked java.lang.Object@62f4a9a3
at org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:111)
at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:447) - locked org.apache.activemq.broker.region.RegionBroker$1@6d0718b7
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConsumer(ManagedRegionBroker.java:240)
at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:91)
at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:89)
at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:95)
at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:550)
at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:349)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228) - locked org.apache.activemq.transport.InactivityMonitor$1@2208d444
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSubscribe(ProtocolConverter.java:425)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:188)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:91)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
at java.lang.Thread.run(Thread.java:619)
Locked ownable synchronizers:
- None
Thread attempting to make new connection:
"ActiveMQ Transport: ssl:///10.1.210.140:45408" - Thread t@55
java.lang.Thread.State: BLOCKED on org.apache.activemq.broker.region.RegionBroker$1@6d0718b7 owned by: ActiveMQ Transport: ssl:///10.1.210.140:45407
at org.apache.activemq.broker.region.RegionBroker.addProducer(RegionBroker.java:392)
at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:93)
at org.apache.activemq.advisory.AdvisoryBroker.addProducer(AdvisoryBroker.java:145)
at org.apache.activemq.broker.CompositeDestinationBroker.addProducer(CompositeDestinationBroker.java:56)
at org.apache.activemq.broker.BrokerFilter.addProducer(BrokerFilter.java:93)
at org.apache.activemq.broker.MutableBrokerFilter.addProducer(MutableBrokerFilter.java:99)
at org.apache.activemq.broker.TransportConnection.processAddProducer(TransportConnection.java:511)
at org.apache.activemq.command.ProducerInfo.visit(ProducerInfo.java:105)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
- locked org.apache.activemq.transport.InactivityMonitor$1@53134610
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
at org.apache.activemq.transport.stomp.ProtocolConverter$2.onResponse(ProtocolConverter.java:518)
at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:579)
at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:58)
at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:255) - locked java.util.concurrent.atomic.AtomicBoolean@165cef0c
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked java.lang.Object@2f52084c
at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1249)
at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:810)
at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:770)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:187)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228) - locked org.apache.activemq.transport.InactivityMonitor$1@53134610
at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:503)
at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:192)
at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:91)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
at java.lang.Thread.run(Thread.java:619)
Locked ownable synchronizers:
- None