Uploaded image for project: 'ActiveMQ .Net'
  1. ActiveMQ .Net
  2. AMQNET-153

Acknowledgment of Messages consumed from VirtualTopic consumer queues fails

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.1.0
    • 1.1.0
    • ActiveMQ
    • None
    • windows xp 64 ,java1.6 ,Activemq 5.2 NMS1.1 vs2008

    Description

      i try to test virtual topic found out the isssue
      1.create cosumer.a.virtualtopic.test queue.
      2.create cosumer.b.virtualtopic.test queue.
      3.create virtualtopic.test queue
      4.send message into topic
      5 consumer the message and uses the ClientAcknowledge submit the message. occurred the error
      ---msga.Acknowledge();

      the mesga can't submit , i go to NMS source look at the code. and debug the information. i found out the if we uses clientacknowledge to submit the message ,it will create the MessageAck , the MessageAck's info from info class and message class
      in MessageConsumer Class
      protected virtual MessageAck CreateMessageAck(Message message)
      {
      MessageAck ack = new MessageAck();
      ack.AckType = (int) AckType.ConsumedAck;
      ack.ConsumerId = info.ConsumerId;
      ack.Destination = message.Destination;
      ack.FirstMessageId = message.MessageId;
      ack.LastMessageId = message.MessageId;
      ack.MessageCount = 1;

      if(session.Transacted)

      { session.DoStartTransaction(); ack.TransactionId = session.TransactionContext.TransactionId; session.TransactionContext.AddSynchronization( new MessageConsumerSynchronization(this, message)); }

      return ack;
      }

      in there i can saw the message.Destination = topic:
      virtualtopic.test , if i change the Destination to queue:
      Consumer.a.Virtualtopic.test . and continue running the program ,it works . but i think the problem not here.

      i think the problem in the running the Acknowledge between Broker and Client

      NMS------------------running Command-----VirtualTopic.test-------------------->Consumer.A.virtualTopic.test

      i guess the problem maybe in command . because i uses the java client to test it . everything is works.

      other information for Message and Command

      Tracer.Debug("Sending Ack: "+ ack);
      ack =

      {MessageAck[ Destination=topic://VirtualTopic.test TransactionId= ConsumerId=ConsumerId[ ConnectionId=1a8ceea3-e808-473a-98a6-e08906181f64 SessionId=1 Value=1 ] AckType=2 FirstMessageId=MessageId[ ProducerId=ProducerId[ ConnectionId=a84996ea-6e8d-475e-bae1-25eee46b482f Value=1 SessionId=1 ] ProducerSequenceId=1 BrokerSequenceId=8 ] LastMessageId=MessageId[ ProducerId=ProducerId[ ConnectionId=a84996ea-6e8d-475e-bae1-25eee46b482f Value=1 SessionId=1 ] ProducerSequenceId=1 BrokerSequenceId=8 ] MessageCount=1 ]}

      transport.Oneway(command);
      command=

      {MessageAck[ Destination=topic://VirtualTopic.test TransactionId= ConsumerId=ConsumerId[ ConnectionId=1a8ceea3-e808-473a-98a6-e08906181f64 SessionId=1 Value=1 ] AckType=2 FirstMessageId=MessageId[ ProducerId=ProducerId[ ConnectionId=a84996ea-6e8d-475e-bae1-25eee46b482f Value=1 SessionId=1 ] ProducerSequenceId=1 BrokerSequenceId=8 ] LastMessageId=MessageId[ ProducerId=ProducerId[ ConnectionId=a84996ea-6e8d-475e-bae1-25eee46b482f Value=1 SessionId=1 ] ProducerSequenceId=1 BrokerSequenceId=8 ] MessageCount=1 ]}

      \

      2009-03-23 10:30:26,132 [/127.0.0.1:4352] ERROR Service - Async error occurred: java.lang.IllegalArgumentException: The subscription does not exist: 23f5c4b1-511f-4336-a0ce-d428ae28574b:1:1
      java.lang.IllegalArgumentException: The subscription does not exist: 23f5c4b1-511f-4336-a0ce-d428ae28574b:1:1
      at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:364)
      at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:462)
      at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
      at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
      at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
      at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
      at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
      at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
      at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
      at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
      at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
      at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
      at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
      at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
      at java.lang.Thread.run(Thread.java:619)

      Attachments

        Activity

          People

            tabish Timothy A. Bish
            pclovec xinfang yuan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: