Details
Description
I'm testing failover transport on Mono.
I've downloaded latest revision of ActiveMQ .Net project (rev. 752559).
I've made tests on:
- both Windows and Linux
- Mono 1.9.1 and Mono 2.0
- ActiveMQ 5.1 and ActiveMQ 5.2
All test ended with an exception.
I wrote simple program that was listening on topic.
namespace IssueExample
{
internal class ConsoleTracer : ITrace
{
public bool IsDebugEnabled { get
public bool IsInfoEnabled { get { return true; }
}
public bool IsWarnEnabled { get
public bool IsErrorEnabled { get { return true; }
}
public bool IsFatalEnabled { get
}
public void Debug(string message)
public void Info(string message)
{ Console.WriteLine("INFO:" + message); }public void Warn(string message)
{ Console.WriteLine("WARN:" + message); }public void Error(string message)
{ Console.WriteLine("ERROR:" + message); }public void Fatal(string message)
{ Console.WriteLine("FATAL:" + message); }}
class Program
{
public static void Main(string[] args)
{
if (args.Length < 2)
Tracer.Trace = new ConsoleTracer();
IConnectionFactory factory = new ConnectionFactory(args[0]);
IConnection connection = factory.CreateConnection();
ISession session = connection.CreateSession();
IDestination destination = session.GetTopic(args[1]);
IMessageConsumer consumer = session.CreateConsumer(destination);
consumer.Listener += onMessage;
connection.Start();
Console.WriteLine("Successfully connected to Topic '" + args[1] + "' on '" + args[0] + "' - waiting Any Key to be pressed");
Console.ReadKey();
consumer.Close();
connection.Dispose();
Console.WriteLine("Connected and Disconnected successfully toTopic '" + args[1] + "' on '" + args[0] + "' - Press any key to terminate program.");
Console.ReadKey();
}
private static void onMessage(IMessage msg)
{ Console.WriteLine("\tRECEIVING: " + msg); } }
}
The test was made with execution:
- mono IssueExample.exe failover:(tcp://localhost:61616) test.topic
Program works fine till the test of reconnection.
While the program is running I restart the AMQ server.
For a while I get loop of:
(...)
DEBUG:Waiting 1280 ms before attempting connection.
DEBUG:Attempting connect to: tcp://localhost:61616/
DEBUG:Opening socket to: localhost on port: 61616
DEBUG:Connect fail to: tcp://localhost:61616/, reason: System.Net.Sockets.SocketException: Connection refused
at System.Net.Sockets.Socket.Connect (System.Net.EndPoint remote_end) [0x00000]
at Apache.NMS.ActiveMQ.Transport.Tcp.TcpTransportFactory.Connect (System.String host, Int32 port) [0x00000]
at Apache.NMS.ActiveMQ.Transport.Tcp.TcpTransportFactory.CompositeConnect (System.Uri location) [0x00000]
at Apache.NMS.ActiveMQ.Transport.TransportFactory.CompositeConnect (System.Uri location) [0x00000]
at Apache.NMS.ActiveMQ.Transport.Failover.FailoverTransport.doReconnect () [0x00000]
(...)
And it is OK, but ...
When server resumes work, I get:
(...)
DEBUG:Waiting 20480 ms before attempting connection.
DEBUG:Parsing type: 2 with: Apache.NMS.ActiveMQ.OpenWire.V2.BrokerInfoMarshaller
DEBUG:Parsing type: 30 with: Apache.NMS.ActiveMQ.OpenWire.V2.ResponseMarshaller
ERROR:Unknown response ID: 0 for response: Response[ CorrelationId=1 ]
DEBUG:Parsing type: 10 with: Apache.NMS.ActiveMQ.OpenWire.V2.KeepAliveInfoMarshaller
INFO:Keep alive message received.
INFO:Waiting for transport to reconnect.
DEBUG:Attempting connect to: tcp://localhost:61616/
DEBUG:Opening socket to: localhost on port: 61616
DEBUG:Parsing type: 1 with: Apache.NMS.ActiveMQ.OpenWire.V1.WireFormatInfoMarshaller
DEBUG:Connect fail to: tcp://localhost:61616/, reason: System.InvalidCastException: Cannot cast from source type to destination type.
at System.Collections.Generic.Dictionary`2+ValueCollection[Apache.NMS.ActiveMQ.Commands.SessionId,Apache.NMS.ActiveMQ.State.SessionState].System.Collections.ICollection.CopyTo (System.Array array, Int32 index) [0x00000]
at System.Collections.ArrayList.InsertRange (Int32 index, ICollection c) [0x00000]
at System.Collections.ArrayList.AddRange (ICollection c) [0x00000]
at System.Collections.ArrayList..ctor (ICollection c) [0x00000]
at Apache.NMS.ActiveMQ.State.SynchronizedCollection`1[Apache.NMS.ActiveMQ.State.SessionState]..ctor (ICollection c) [0x00000]
at Apache.NMS.ActiveMQ.State.SynchronizedDictionary`2[Apache.NMS.ActiveMQ.Commands.SessionId,Apache.NMS.ActiveMQ.State.SessionState].get_Values () [0x00000]
at Apache.NMS.ActiveMQ.State.ConnectionState.get_SessionStates () [0x00000]
at Apache.NMS.ActiveMQ.State.ConnectionStateTracker.DoRestoreSessions (ITransport transport, Apache.NMS.ActiveMQ.State.ConnectionState connectionState) [0x00000]
at Apache.NMS.ActiveMQ.State.ConnectionStateTracker.DoRestore (ITransport transport) [0x00000]
at Apache.NMS.ActiveMQ.Transport.Failover.FailoverTransport.restoreTransport (ITransport t) [0x00000]
at Apache.NMS.ActiveMQ.Transport.Failover.FailoverTransport.doReconnect () [0x00000]
(...)
And this is permanent state.