ActiveMQ
  1. ActiveMQ
  2. AMQ-2019

Stomp client hang after a long time pending

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Minor Minor
    • Resolution: Fixed
    • Affects Version/s: 5.2.0
    • Fix Version/s: 5.2.0
    • Component/s: None
    • Labels:
      None
    • Environment:

      linux with perl 5.8.8 and stomp

      Description

      i have a program which use stomp to receive messages from activemq,
      when i start it and wait for several minutes before close it, everything is ok.
      but if i wait for one hour, for example, when i close it, then i find from the management page, the two consumers are still there.
      and if i restart the program with these tow hanging consumers, then i could not receive any message.
      i do not know whether it is a bug or just my program problem, sorry for disturb you here for an analyse, please.
      here is my program:
      #!/usr/bin/perl -w
      use strict;
      use Switch;
      use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);

      use Net::Stomp;

      use Term::ANSIColor;
      #for multithread
      use threads;
      use threads::shared;

      #for ActiveMQ connection
      my $aq_host = "10.255.1.130";
      my $aq_port = '61613';
      my $sSender;
      my $sReceiver;
      my $aReceiver;

      my $queue1 = "queue1";
      my $queue2 = "queue2";

      my $running : shared = 1;

      my $thread_th1;
      my $thread_th2;

      sub cleanup()
      {
      print colored ("\n.STOP received! \n", 'red');
      $running=0;
      print ".Stopping Thread1 ...... ";
      my $r1 = $thread_th1->join();
      print colored ("$r1\n", 'green');

      print ".Stopping Thread2 ...... ";
      my $r2 = $thread_th2->join();
      print colored ("$r2\n", 'green');

      print ".Disconnecting ActiveMQ connector ...... ";
      $sReceiver->disconnect();
      $aReceiver->disconnect();
      print colored ("OK\n", 'green');

      print "\n";
      print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
      }

      sub th1()
      {
      $sReceiver->subscribe(

      { destination => "/queue/$queue1", 'ack' => 'client', 'activemq.prefetchSize' => 1}

      );
      while($running){

      if ($sReceiver->can_read(

      {timeout=>'1'}))
      {
      my $frame = $sReceiver->receive_frame;
      my $body = $frame->body;

      #print "get message from queue 1: $body \n";
      ptMsg($body);
      #Ack msg for deleting after the procedure
      $sReceiver->ack({ frame => $frame});
      }
      }

      $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
      return "OK";
      }

      sub th2()
      {
      $aReceiver->subscribe({ destination => "/queue/$queue2", 'ack' => 'client', 'activemq.prefetchSize' => 1});
      while($running)
      {
      #get one message from queue
      if ($aReceiver->can_read({timeout=>'1'}

      ))
      {
      my $frame = $aReceiver->receive_frame;
      my $body = $frame->body;
      #print "get message from queue 2: $body";
      ptMsg($body);
      $aReceiver->ack(

      { frame => $frame}

      );
      }
      }
      #when received cleanup message
      $aReceiver->unsubscribe(

      { destination => "/queue/$queue2"}

      );
      return "OK";
      }

      sub ptMsg
      {
      my $msg = shift;
      print "received message $msg \n";
      }
      #################################################################

      1. main function start
        #################################################################

      print ".Creating ActiveMQ connector ...... ";
      $sReceiver = Net::Stomp->new(

      { hostname => $aq_host, port => $aq_port});
      $sReceiver->connect();
      $aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port}

      );
      $aReceiver->connect();
      print colored ("OK\n",'green');

      $SIG

      {INT}

      = "cleanup";

      print ".Starting Thread1 ...... ";
      $thread_th1 = threads->create(\&th1);
      print colored ("OK\n", 'green');

      print ".Starting Thread2 ...... ";
      $thread_th2 = threads->create(\&th2);
      print colored ("OK\n", 'green');

      #main process waiting for signal clean
      print colored ("->running ......\n", 'green');
      print "\n";

      while ($running) {
      usleep(5000000);
      }

        Activity

        Jeff Turner made changes -
        Project Import Fri Nov 26 22:32:02 EST 2010 [ 1290828722158 ]
        Hongwu LU made changes -
        Resolution Fixed [ 1 ]
        Status Open [ 1 ] Closed [ 6 ]
        Fix Version/s 5.2.0 [ 11841 ]
        Hide
        Hongwu LU added a comment -

        without keep alive support of stomp, we could send some period msg to simulate keep alive.

        Show
        Hongwu LU added a comment - without keep alive support of stomp, we could send some period msg to simulate keep alive.
        Hide
        Dejan Bosanac added a comment -

        It looks like a socket timeout problem. You can try increasing that time, but generally the better solution is to have a kind of a "keep alive" protocol. It is planed for Stomp v1.1. Until then you can send some dummy messages periodically to keep connection alive. Hope this helps.

        Show
        Dejan Bosanac added a comment - It looks like a socket timeout problem. You can try increasing that time, but generally the better solution is to have a kind of a "keep alive" protocol. It is planed for Stomp v1.1. Until then you can send some dummy messages periodically to keep connection alive. Hope this helps.
        Hongwu LU made changes -
        Field Original Value New Value
        Description i have a program which use stomp to receive messages from activemq,
        when i start it and wait for several minutes before close it, everything is ok.
        but if i wait for one hour, for example, when i close it, then i find from the management page, the two consumers are still there.
        and if i restart the program with these tow hanging consumers, then i could not receive any message.
        i do not know whether it is a bug or just my program problem, sorry for disturb you here for an analyse, please.
        here is my program:
        #!/usr/bin/perl -w
        use strict;
        use Switch;
        use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);

        use Net::Stomp;

        use Term::ANSIColor;
        #for multithread
        use threads;
        use threads::shared;

        #for ActiveMQ connection
        my $aq_host = "10.255.1.130";
        my $aq_port = '61613';
        my $sSender;
        my $sReceiver;
        my $aReceiver;

        my $queue1 = "queue1";
        my $queue2 = "queue2";

        my $running : shared = 1;

        my $thread_th1;
        my $thread_th2;

        sub cleanup()
        {
        print colored ("\n.STOP received! \n", 'red');
        $running=0;

        print ".Stopping Thread1 ...... ";
        my $r1 = $thread_th1->join();
        print colored ("$r1\n", 'green');

        print ".Stopping Thread2 ...... ";
        my $r2 = $thread_th2->join();
        print colored ("$r2\n", 'green');

        print ".Disconnecting ActiveMQ connector ...... ";
        $sReceiver->disconnect();
        $aReceiver->disconnect();
        print colored ("OK\n", 'green');

        print "\n";
        print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
        }


        sub th1()
        {
        $sReceiver->subscribe({ destination => "/queue/$queue1",
        'ack' => 'client',
        'activemq.prefetchSize' => 1});
        while($running){

        if ($sReceiver->can_read({timeout=>'1'}))
        {
        my $frame = $sReceiver->receive_frame;
        my $body = $frame->body;

        #print "get message from queue 1: $body \n";
        ptMsg($body);
        #Ack msg for deleting after the procedure
        $sReceiver->ack({ frame => $frame});
        }
        }

        $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
        return "OK";
        }

        sub th2()
        {
        $aReceiver->subscribe({ destination => "/queue/$queue2",
        'ack' => 'client',
        'activemq.prefetchSize' => 1});
        while($running)
        {
        #get one message from queue
        if ($aReceiver->can_read({timeout=>'1'}))
        {
        my $frame = $aReceiver->receive_frame;
        my $body = $frame->body;
        #print "get message from queue 2: $body";
        ptMsg($body);
        $aReceiver->ack({ frame => $frame});
        }
        }
        #when received cleanup message
        $aReceiver->unsubscribe({ destination => "/queue/$queue2"});
        return "OK";
        }

        sub ptMsg
        {
        my $msg = shift;
        print "received message $msg \n";
        }
        #################################################################
        # main function start
        #################################################################

        print ".Creating ActiveMQ connector ...... ";
        $sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
        $sReceiver->connect();
        $aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
        $aReceiver->connect();
        print colored ("OK\n",'green');

        $SIG{INT} = "cleanup";

        print ".Starting Thread1 ...... ";
        $thread_th1 = threads->create(\&th1);
        print colored ("OK\n", 'green');

        print ".Starting Thread2 ...... ";
        $thread_th2 = threads->create(\&th2);
        print colored ("OK\n", 'green');

        #main process waiting for signal clean
        print colored ("->running ......\n", 'green');
        print "\n";

        while ($running) {
        usleep(5000000);
        }
         
        i have a program which use stomp to receive messages from activemq,
        when i start it and wait for several minutes before close it, everything is ok.
        but if i wait for one hour, for example, when i close it, then i find from the management page, the two consumers are still there.
        and if i restart the program with these tow hanging consumers, then i could not receive any message.
        i do not know whether it is a bug or just my program problem, sorry for disturb you here for an analyse, please.
        here is my program:
        #!/usr/bin/perl -w
        use strict;
        use Switch;
        use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);

        use Net::Stomp;

        use Term::ANSIColor;
        #for multithread
        use threads;
        use threads::shared;

        #for ActiveMQ connection
        my $aq_host = "10.255.1.130";
        my $aq_port = '61613';
        my $sSender;
        my $sReceiver;
        my $aReceiver;

        my $queue1 = "queue1";
        my $queue2 = "queue2";

        my $running : shared = 1;

        my $thread_th1;
        my $thread_th2;

        sub cleanup()
        {
        print colored ("\n.STOP received! \n", 'red');
        $running=0;
        print ".Stopping Thread1 ...... ";
        my $r1 = $thread_th1->join();
        print colored ("$r1\n", 'green');

        print ".Stopping Thread2 ...... ";
        my $r2 = $thread_th2->join();
        print colored ("$r2\n", 'green');

        print ".Disconnecting ActiveMQ connector ...... ";
        $sReceiver->disconnect();
        $aReceiver->disconnect();
        print colored ("OK\n", 'green');

        print "\n";
        print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
        }

        sub th1()
        {
        $sReceiver->subscribe({ destination => "/queue/$queue1",
        'ack' => 'client',
        'activemq.prefetchSize' => 1});
        while($running){

        if ($sReceiver->can_read({timeout=>'1'}))
        {
        my $frame = $sReceiver->receive_frame;
        my $body = $frame->body;

        #print "get message from queue 1: $body \n";
        ptMsg($body);
        #Ack msg for deleting after the procedure
        $sReceiver->ack({ frame => $frame});
        }
        }

        $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
        return "OK";
        }

        sub th2()
        {
        $aReceiver->subscribe({ destination => "/queue/$queue2",
        'ack' => 'client',
        'activemq.prefetchSize' => 1});
        while($running)
        {
        #get one message from queue
        if ($aReceiver->can_read({timeout=>'1'}))
        {
        my $frame = $aReceiver->receive_frame;
        my $body = $frame->body;
        #print "get message from queue 2: $body";
        ptMsg($body);
        $aReceiver->ack({ frame => $frame});
        }
        }
        #when received cleanup message
        $aReceiver->unsubscribe({ destination => "/queue/$queue2"});
        return "OK";
        }

        sub ptMsg
        {
        my $msg = shift;
        print "received message $msg \n";
        }
        #################################################################
        # main function start
        #################################################################

        print ".Creating ActiveMQ connector ...... ";
        $sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
        $sReceiver->connect();
        $aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
        $aReceiver->connect();
        print colored ("OK\n",'green');

        $SIG{INT} = "cleanup";

        print ".Starting Thread1 ...... ";
        $thread_th1 = threads->create(\&th1);
        print colored ("OK\n", 'green');

        print ".Starting Thread2 ...... ";
        $thread_th2 = threads->create(\&th2);
        print colored ("OK\n", 'green');

        #main process waiting for signal clean
        print colored ("->running ......\n", 'green');
        print "\n";

        while ($running) {
        usleep(5000000);
        }
        Hongwu LU created issue -

          People

          • Assignee:
            Unassigned
            Reporter:
            Hongwu LU
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development