History | Log In     View a printable version of the current page.  
Issue Details (XML | Word | Printable)

Key: AMQ-696
Type: Bug Bug
Status: Resolved Resolved
Resolution: Fixed
Priority: Major Major
Assignee: Unassigned
Reporter: Craig Day
Votes: 0
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

Client: XXX already connected exception when connection started after consumers added

Created: 28/Apr/06 05:00 AM   Updated: 17/Jun/06 08:14 PM
Component/s: Broker
Affects Version/s: 4.0 RC2, 4.0 RC3
Fix Version/s: 4.0

Time Tracking:
Not Specified

Environment: WinXP


 Description  « Hide
While using the new Spring-2.0 DefaultMessageListenerContainer I can reliably reproduce the following exception on the broker side which usually results in a hang on the client side:

The broker logs the following exception:

INFO Service - Sync error occurred: javax.jms.InvalidClientIDException: Broker: localhost - Client: ID:inspiron-1410-114619274
7453-2:1 already connected
javax.jms.InvalidClientIDException: Broker: localhost - Client: ID:inspiron-1410-1146192747453-2:1 already connected
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:154)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:69)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:65)
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:77)
at org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:500)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:82)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:106)
at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:196)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:93)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:70)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:114)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:139)
at java.lang.Thread.run(Thread.java:595)

I have extrapolated the sequence of calls that DefaultMessageListenerContainer is making and managed to produce a simple test case that reproduces the problem:

import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;

import javax.jms.*;

public class TestActiveMQ extends TestCase {

public void testConnectionFactory() throws Exception {
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
final Connection conn = cf.createConnection();

Runnable r = new Runnable() {
public void run() {
try { Session session = conn.createSession(false, 1); MessageConsumer consumer = session.createConsumer(queue, null); Message msg = consumer.receive(1000); } catch (JMSException e) { e.printStackTrace(); }
}
};
new Thread(r).start();
conn.start();

try {
synchronized (this) { wait(3000); }
} catch (InterruptedException e) { e.printStackTrace(); }
}
}

Let us know if you need anymore information. Dont want to scrub ActiveMQ from my list of candidates If I can help it.

cheers
craig



 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
James Strachan - 01/May/06 09:32 AM
I've added your test case to SVN

https://svn.apache.org/repos/asf/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java

and unfortunately it works perfectly.

I suspect the bug is caused by Spring reusing a clientID on 2 JMS connections? Can you get a JUnit test case to fail by itself? I suspect some other node in the network is causing the problem, not this test case


Craig Day - 01/May/06 11:58 AM
The attached test case fails for me everytime. I can get it to not fail by sleeping a bit before the conn.start() call, so its obviously a problem related to connection starting concurrently with one of the other operations in the other thread. This is against a single broker.

I have analysed the way Spring is using/setting clientID, and in short, it isnt unless I tell it to. Im only opening one connection anyway, and you can see just one connection in the test case i provided.

Switching to another JMS provider with the same code/config works fine.


James Strachan - 01/May/06 12:42 PM
I'm wondering if this is a timing issue between one thread calling connection.start() and another calling createSession().

Maybe it only fails on your OS, processor & JVM etc?


Craig Day - 01/May/06 01:19 PM

Some further information for you. I attached a debugger to the broker and the testcase.

Here is the problem:

The testcase calls createSession() resulting in the following stack of calls:

wait():-1, java.lang.Object
wait():474, java.lang.Object
waitFor():267, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
get():117, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
getResult():44, org.apache.activemq.transport.FutureResponse
request():69, org.apache.activemq.transport.ResponseCorrelator
syncSendPacket():1108, org.apache.activemq.ActiveMQConnection
ensureConnectionInfoSent():1196, org.apache.activemq.ActiveMQConnection
createSession():252, org.apache.activemq.ActiveMQConnection
run():28, test.click.TestActiveMQ$1
run():595, java.lang.Thread

on the server this ends up at:

addConnection():148, org.apache.activemq.broker.region.RegionBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():69, org.apache.activemq.advisory.AdvisoryBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():77, org.apache.activemq.broker.MutableBrokerFilter
processAddConnection():500, org.apache.activemq.broker.AbstractConnection
processAddConnection():82, org.apache.activemq.broker.jmx.ManagedTransportConnection
visit():106, org.apache.activemq.command.ConnectionInfo
service():196, org.apache.activemq.broker.AbstractConnection
onCommand():62, org.apache.activemq.broker.TransportConnection$1
onCommand():93, org.apache.activemq.transport.ResponseCorrelator
onCommand():70, org.apache.activemq.transport.TransportFilter
onCommand():114, org.apache.activemq.transport.WireFormatNegotiator
onCommand():122, org.apache.activemq.transport.InactivityMonitor
doConsume():87, org.apache.activemq.transport.TransportSupport
run():139, org.apache.activemq.transport.tcp.TcpTransport
run():595, java.lang.Thread

where RegionBroker adds the generated clientID to the clientIDSet.

Note that I havent called conn.start() yet!

The main thread then calls conn.start() resulting in the following stack of calls:

wait():-1, java.lang.Object
wait():474, java.lang.Object
waitFor():267, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
get():117, edu.emory.mathcs.backport.java.util.concurrent.FutureTask
getResult():44, org.apache.activemq.transport.FutureResponse
request():69, org.apache.activemq.transport.ResponseCorrelator
syncSendPacket():1108, org.apache.activemq.ActiveMQConnection
ensureConnectionInfoSent():1196, org.apache.activemq.ActiveMQConnection
start():412, org.apache.activemq.ActiveMQConnection
testConnectionFactory():38, test.click.TestActiveMQ
invoke0():-1, sun.reflect.NativeMethodAccessorImpl
invoke():39, sun.reflect.NativeMethodAccessorImpl
invoke():25, sun.reflect.DelegatingMethodAccessorImpl
invoke():585, java.lang.reflect.Method
runTest():154, junit.framework.TestCase

i.e. another call to ensureConnectionInfoSent()!!! resulting in the broker stack of calls:

addConnection():148, org.apache.activemq.broker.region.RegionBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():69, org.apache.activemq.advisory.AdvisoryBroker
addConnection():65, org.apache.activemq.broker.BrokerFilter
addConnection():77, org.apache.activemq.broker.MutableBrokerFilter
processAddConnection():500, org.apache.activemq.broker.AbstractConnection
processAddConnection():82, org.apache.activemq.broker.jmx.ManagedTransportConnection
visit():106, org.apache.activemq.command.ConnectionInfo
service():196, org.apache.activemq.broker.AbstractConnection
onCommand():62, org.apache.activemq.broker.TransportConnection$1
onCommand():93, org.apache.activemq.transport.ResponseCorrelator
onCommand():70, org.apache.activemq.transport.TransportFilter
onCommand():114, org.apache.activemq.transport.WireFormatNegotiator
onCommand():122, org.apache.activemq.transport.InactivityMonitor
doConsume():87, org.apache.activemq.transport.TransportSupport
run():139, org.apache.activemq.transport.tcp.TcpTransport
run():595, java.lang.Thread

where the clientID is checked and collides with the clientID deposited from the first call.

Its pretty obvious looking at it that ensureConnectionInfo() and the setting of the boolean
isConnectionInfoSentToBroker is not thread safe:

/**

  • Send the ConnectionInfo to the Broker
  • @throws JMSException
    */
    protected void ensureConnectionInfoSent() throws JMSException {
    // Can we skip sending the ConnectionInfo packet??
    if (isConnectionInfoSentToBroker || closed.get()) { return; }

if (info.getClientId() == null || info.getClientId().trim().length() == 0) { info.setClientId(clientIdGenerator.generateId()); }
syncSendPacket(info);

this.isConnectionInfoSentToBroker = true;
// Add a temp destination advisory consumer so that
// We know what the valid temporary destinations are on the
// broker without having to do an RPC to the broker.

ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1),consumerIdGenerator.getNextSequenceId());
advisoryConsumer = new AdvisoryConsumer(this, consumerId);

}


James Strachan - 02/May/06 12:29 PM
Great catch Craig - thanks for figuring all that out - I was starting to suspect something like this could be the cause of your error.

I've made the ensureConnectionInfoSent() method synchronized now so that this loophole should be now fixed. Could you test its working for you now please? Let us know if its not and we can reopen this issue


Sanjiv Jivan - 15/Jun/06 08:22 PM
fyi I ran into the very same issue with 4.0 RC3. After upgrading to the 4.0 release jar (dated jun 13, 2006), I no longer see this error.

Sanjiv Jivan - 16/Jun/06 02:08 AM
oops, looks like I spoke too soon. With Active MQ 4.0 RC3, I used to get this exception as soon as the brokers were started. However with 4.0 final, on startup this exception does not occur. It shows up when messages are received by the broker. I'm using Spring-2.0 with DefaultMessageListenerContainer as well.

Here's the stacktrace :

2006-06-15 22:06:35,017 INFO [org.apache.activemq.broker.AbstractConnection.Service] - <Sync error occurred: javax.jms.InvalidClientIDException: Broker: localhost - Client: ID:weston-2030-1150416833580-1:0 already connected>
javax.jms.InvalidClientIDException: Broker: localhost - Client: ID:weston-2030-1150416833580-1:0 already connected
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:176)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:69)
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:69)
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:69)
at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:82)
at org.apache.activemq.broker.AbstractConnection.processAddConnection(AbstractConnection.java:507)
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:118)
at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:201)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:114)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:143)
at java.lang.Thread.run(Thread.java:534)


James Strachan - 16/Jun/06 08:23 AM
Sanjiv - can you provide some kind of test case to demonstrate the problem - I'm not sure what you are doing and generally don't use the Spring DefaultMessageListenerContainer

Sanjiv Jivan - 17/Jun/06 08:14 PM
I've created a separate issue : http://issues.apache.org/activemq/browse/AMQ-760 and uplaoded a test app to demonstrate this and some other issues as well.