Uploaded image for project: 'ActiveMQ Artemis'
  1. ActiveMQ Artemis
  2. ARTEMIS-630

STOMP server quits sending to all subscribers when one client disconnects

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Cannot Reproduce
    • 1.1.0
    • None
    • STOMP
    • None
    • Wildfly 10.0.0, Windows 7 64 bit, Java 1.8.0_91

    Description

      Multiple clients connected to a JMS topic via STOMP. When one client disconnects from the server then all clients quit receiving messages and cannot send messages. As soon as a new client sends a SUBSCRIBE message then all clients begin receiving messages again.

      Here is the way STOMP and the JMS topic is defined in standalone.xml:
      <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
      <server name="default">
      <security-setting name="#">
      <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
      </security-setting>
      <address-setting name="#" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
      <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
      <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
      <param name="batch-delay" value="50"/>
      </http-connector>
      <in-vm-connector name="in-vm" server-id="0"/>
      <http-acceptor name="http-acceptor" http-listener="default"/>
      <http-acceptor name="http-acceptor-throughput" http-listener="default">
      <param name="batch-delay" value="50"/>
      <param name="direct-deliver" value="false"/>
      </http-acceptor>
      <in-vm-acceptor name="in-vm" server-id="0"/>
      <acceptor name="stomp-acceptor" factory-class="org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory">
      <param name="protocols" value="STOMP"/>
      <param name="port" value="61613"/>
      </acceptor>
      <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
      <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
      <jms-topic name="ACRS_Exit" entries="java:/jms/topic/ACRS_Exit"/>
      <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
      <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
      <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
      </server>
      </subsystem>

      Run multiple instances of the program listed below. Stagger the starts by a minute or so. As soon and one instance of the program completes, all other instances will start having problems sending messages to the topic. The server will send a response that says "Destination does not exist\c jms.topic.ACRS_Exit". If you start another instance then all other running instances will start receiving messages and will be able to send messages.

      Here is the code for the sample program:

      import java.io.BufferedReader;
      import java.io.InputStreamReader;
      import java.net.InetSocketAddress;
      import java.net.SocketAddress;
      import java.nio.ByteBuffer;
      import java.nio.channels.AsynchronousSocketChannel;
      import java.nio.channels.CompletionHandler;
      import java.nio.charset.Charset;
      import java.text.DateFormat;
      import java.text.SimpleDateFormat;
      import java.util.Date;
      import java.util.concurrent.Future;

      public class StompClient {
      public static void main(String[] args) throws Exception

      { StompClient foo = new StompClient(); }

      public StompClient() throws Exception

      { AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); SocketAddress serverAddr = new InetSocketAddress("localhost", 61613); Future<Void> result = channel.connect(serverAddr); result.get(); System.out.println("Socket Connected"); // start two threads, one for reading and one for writing ReaderThread rt = new ReaderThread(channel); Thread readerThread = new Thread(rt); readerThread.start(); Thread writerThread = new Thread(new WriterThread(channel)); writerThread.start(); }

      protected class ReaderThread implements Runnable {
      AsynchronousSocketChannel m_channel;
      public ReaderThread(AsynchronousSocketChannel channel)

      { m_channel = channel; }

      public void run() {
      String outputStr;
      Integer readByteCnt;

      while (true) {
      ByteBuffer buffer = ByteBuffer.allocate(2048);
      buffer.clear();
      Future<Integer> result = m_channel.read(buffer);
      try
      { readByteCnt = (Integer)result.get(); } catch (Exception ex) { readByteCnt = -1; }
      if (readByteCnt > -1)
      {
      // convert the bytes to a string
      try
      { outputStr = new String(buffer.array(), "UTF-8"); outputStr = outputStr.trim(); System.out.println("-----Beginning of Message From Stomp Server:"); System.out.println(outputStr); System.out.println("-----Ending of Message From Stomp Server"); } catch (Exception ex) { System.out.println("ReaderThread Exception:" + ex.getMessage()); }
      }
      }
      }
      }

      protected class WriterThread implements Runnable {
      AsynchronousSocketChannel m_channel;
      public WriterThread(AsynchronousSocketChannel channel) { m_channel = channel; }

      public void run() {
      String topicName = "jms.topic.ACRS_Exit";
      ByteBuffer buffer = ByteBuffer.allocate(2048);
      Charset cs = Charset.forName("UTF-8");
      Integer writtenByteCnt;
      Future<Integer> result;
      byte[] data;
      String msg;

      try
      {
      msg = "CONNECT\n";
      msg = msg + "accept-version:1.2\n";
      msg = msg + "heart-beat:5000,5000\n";
      msg = msg + "login:dynsub\n";
      msg = msg + "passcode:dynsub\n";
      msg = msg + "\n";
      msg = msg + '\0';

      SendMessageToStomp(m_channel, msg);

      java.lang.Thread.sleep(5000);

      msg = "SUBSCRIBE\n";
      msg = msg + "destination:" + topicName + "\n";
      msg = msg + "id:dest1\n";
      msg = msg + "ack:auto\n";
      msg = msg + "\n";
      msg = msg + '\0';

      SendMessageToStomp(m_channel, msg);

      // send a heartbeat message every 5 seconds
      // NOTE: this was changed to send the date and time instead of an empty message
      DateFormat dateFormat = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss");
      for(int i = 0; i < 20; i++)

      { java.lang.Thread.sleep(5000); SendMessageToStomp(m_channel, "\n"); // NOTE: this was changed to also send a message to the topic Date d = new Date(); String dateStr = dateFormat.format(d); msg = "SEND\n"; msg += "destination:" + topicName + "\n"; msg += "content-type:text/plain\n"; msg += "content-length:" + dateStr.length() + "\n"; msg += "\n"; msg += dateStr; msg += '\0'; SendMessageToStomp(m_channel, msg); }

      java.lang.Thread.sleep(5000);

      // now test the unsubscribe
      msg = "UNSUBSCRIBE\n";
      msg = msg + "id:dest1\n";
      msg = msg + "\n";
      msg = msg + '\0';
      SendMessageToStomp(m_channel, msg);

      msg = "DISCONNECT\n";
      msg = msg + '\0';
      SendMessageToStomp(m_channel, msg);

      m_channel.shutdownInput();
      m_channel.shutdownOutput();
      m_channel.close();

      System.exit(0);

      /*
      // if the UNSUBSCRIBE, DISCONNECT, and socket shutdown/close
      // are removed and the following added, the server will
      // continue to send messages to other clients for about 20 seconds.

      // send a heartbeat message every 5 seconds
      while(true)

      { java.lang.Thread.sleep(5000); SendMessageToStomp(m_channel, "\n"); }

      */

      } catch (Exception ex)

      { System.out.println("WriterThread Exception:" + ex.getMessage()); }

      }
      }

      protected void SendMessageToStomp(AsynchronousSocketChannel channel, String msg) throws Exception

      { ByteBuffer buffer = ByteBuffer.allocate(2048); Charset cs = Charset.forName("UTF-8"); Integer writtenByteCnt; Future<Integer> result; byte[] data; data = msg.getBytes(cs); buffer.put(data); buffer.flip(); System.out.println("-----Sending message to Stomp Server:\n" + msg); result = channel.write(buffer); writtenByteCnt = (Integer)result.get(); }

      }

      Attachments

        1. StompClient.java
          6 kB
          Ed Kaltenbach

        Activity

          People

            Unassigned Unassigned
            ekaltenbach Ed Kaltenbach
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: