Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-2652

Performance problems

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Auto Closed
    • None
    • None
    • Python Client
    • None
    • Debian GNU Linux squeeze, QPID 0.6/trunk

    Description

      When using the python client, I noticed that there's a significant performance difference when using python and c++ services. Using the client below runs ~10s with the c++ request/reponse and ~40s with the python one. A factor 4 slowdown doesn't seem to recommend the python libs.

      C++ request/response:

      #include <qpid/messaging/Address.h>
      #include <qpid/messaging/Connection.h>
      #include <qpid/messaging/Message.h>
      #include <qpid/messaging/Receiver.h>
      #include <qpid/messaging/Sender.h>
      #include <qpid/messaging/Session.h>
      #include <qpid/messaging/Variant.h>

      #include <algorithm>
      #include <cstdlib>
      #include <iostream>
      #include <memory>
      #include <sstream>
      #include <pthread.h>

      using namespace qpid::messaging;

      using std::stringstream;
      using std::string;

      Connection connection;

      void * run(void *conn_ptr) {
      Session session = connection.newSession();
      Receiver receiver = session.createReceiver("requests;

      {create: always}

      ");
      receiver.setCapacity(2);
      while (true) {
      Message request = receiver.fetch();
      try {
      const Address& address = request.getReplyTo();
      if (address)

      { std::string s = request.getContent(); std::cout << "Processing request: " << request.getContent() << " -> " << request.getReplyTo().getName() << std::endl; Sender sender = session.createSender(request.getReplyTo().getName() ); std::transform(s.begin(), s.end(), s.begin(), toupper); Message response(s); sender.send(response); std::cout << "Processed request: " << request.getContent() << " -> " << response.getContent() << std::endl; session.acknowledge(); }

      else

      { std::cerr << "Error: no reply address specified for request: " << request.getContent() << std::endl; session.reject(request); }

      } catch(const std::exception& error)

      { std::cout << "Something went wrong: " << error.what() << std::endl; session.reject(request); }

      }
      }

      int main(int argc, char** argv) {
      const char* url = argc>1 ? argv[1] : "amqp:tcp:127.0.0.1:5672";
      Variant::Map amqp_options;
      amqp_options["username"] = "admin";
      amqp_options["password"] = "secret";

      connection = Connection::open(url, amqp_options);

      int i = 0;
      pthread_t threads[10];
      // Create the threads
      for(i = 0; i<10; i++)

      { pthread_create(&threads[i], NULL, run, (void*) NULL); }

      // Get the threads back
      for(i = 0; i<10; i++)

      { pthread_join(threads[i], NULL); }

      connection.close();
      return 0;
      }


      Python request/response:

      import traceback
      from qpid.messaging import *
      from threading import Thread

      class Processor(Thread):

      def _init_(self, ssn):
      Thread._init_(self)
      self.setDaemon(True)
      self.ssn = ssn

      def run(self):
      while True:
      msg = self.ssn.next_receiver().fetch()
      try:
      to, reply = self.process(msg)
      snd = ssn.sender(to)
      snd.send(reply)
      snd.close()
      self.ssn.acknowledge(msg)
      except:
      traceback.print_exc()
      print "rejecting: %s" % msg
      self.ssn.acknowledge(msg, Disposition(REJECTED))

      def process(self, msg):
      print "processing %s" % msg
      return msg.reply_to, Message("echo %s" % msg.content)

      conn = Connection.establish("localhost", username='admin', password='secret')

      processors = []

      for i in range(10):
      ssn = conn.session()
      rcv = ssn.receiver("requests", capacity=10)
      proc = Processor(ssn)
      proc.start()
      processors.append(proc)

      1. this may look a bit weird, but the timed join keeps python from
      2. swallowing interrupts
        while True:
        for p in processors:
        p.join(3)


      Test-Client:

      import traceback
      from qpid.messaging import *
      from threading import Thread

      conn = Connection.establish("localhost")
      ssn = conn.session(str(uuid4()))
      snd = ssn.sender("requests")
      rcv = ssn.receiver('reply-%s;

      {create:always, delete:always}

      ' % ssn.name)

      for i in range(1000):
      msg = Message("The quick brown fox tests the performance")
      msg.reply_to = 'reply-%s' % ssn.name
      snd.send(msg)
      res = rcv.fetch()
      print res.content

      Attachments

        Activity

          People

            Unassigned Unassigned
            cpollmeier Cajus Pollmeier
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: