Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-6836

qpid.messaging hangs in child forked process

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Not A Problem
    • Affects Version/s: 0.32
    • Fix Version/s: None
    • Component/s: Python Client
    • Labels:
      None

      Description

      A python process that uses qpid.messaging and then forks causes qpid.messaing use in the child process to randomly hang. To reproduce:

      1. in a python process instantiate a qpid.messaing client and call open()
      2. Call os.fork()
      3. Try to instantiate another qpid.messaging client and call open()
      4. observe the child process is waiting on a file descriptor that is shared with the parent which will never wake up.

      Here is a potential reproducer with actual python code (from kgiusti):

      #!/usr/bin/env python
      #
      # Licensed to the Apache Software Foundation (ASF) under one
      # or more contributor license agreements.  See the NOTICE file
      # distributed with this work for additional information
      # regarding copyright ownership.  The ASF licenses this file
      # to you under the Apache License, Version 2.0 (the
      # "License"); you may not use this file except in compliance
      # with the License.  You may obtain a copy of the License at
      # 
      #   http://www.apache.org/licenses/LICENSE-2.0
      # 
      # Unless required by applicable law or agreed to in writing,
      # software distributed under the License is distributed on an
      # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
      # KIND, either express or implied.  See the License for the
      # specific language governing permissions and limitations
      # under the License.
      #
      
      import optparse, time
      from qpid.messaging import *
      from qpid.util import URL
      from qpid.log import enable, DEBUG, WARN
      import os
      import sys
      
      def nameval(st):
        idx = st.find("=")
        if idx >= 0:
          name = st[0:idx]
          value = st[idx+1:]
        else:
          name = st
          value = None
        return name, value
      
      parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
                                     description="Send messages to the supplied address.")
      parser.add_option("-b", "--broker", default="localhost",
                        help="connect to specified BROKER (default %default)")
      parser.add_option("-r", "--reconnect", action="store_true",
                        help="enable auto reconnect")
      parser.add_option("-i", "--reconnect-interval", type="float", default=3,
                        help="interval between reconnect attempts")
      parser.add_option("-l", "--reconnect-limit", type="int",
                        help="maximum number of reconnect attempts")
      parser.add_option("-c", "--count", type="int", default=1,
                        help="stop after count messages have been sent, zero disables (default %default)")
      parser.add_option("-d", "--durable", action="store_true",
                        help="make the message persistent")
      parser.add_option("-t", "--timeout", type="float", default=None,
                        help="exit after the specified time")
      parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
      parser.add_option("-S", "--subject", help="specify a subject")
      parser.add_option("-R", "--reply-to", help="specify reply-to address")
      parser.add_option("-P", "--property", dest="properties", action="append", default=[],
                        metavar="NAME=VALUE", help="specify message property")
      parser.add_option("-M", "--map", dest="entries", action="append", default=[],
                        metavar="KEY=VALUE",
                        help="specify map entry for message body")
      parser.add_option("-v", dest="verbose", action="store_true",
                        help="enable logging")
      
      opts, args = parser.parse_args()
      
      if opts.verbose:
        enable("qpid", DEBUG)
      else:
        enable("qpid", WARN)
      
      if opts.id is None:
        spout_id = str(uuid4())
      else:
        spout_id = opts.id
      if args:
        addr = args.pop(0)
      else:
        parser.error("address is required")
      
      content = None
      content_type = None
      
      if args:
        text = " ".join(args)
      else:
        text = None
      
      if opts.entries:
        content = {}
        if text:
          content["text"] = text
        for e in opts.entries:
          name, val = nameval(e)
          content[name] = val
      else:
        content = text
        # no entries were supplied, so assume text/plain for
        # compatibility with java (and other) clients
        content_type = "text/plain"
      
      import pdb
      conn = Connection(opts.broker,
                        reconnect=opts.reconnect,
                        reconnect_interval=opts.reconnect_interval,
                        reconnect_limit=opts.reconnect_limit)
      conn.open()
      print("Conn open")
      pid = os.fork()
      if not pid:
          #child
          try:
              #pdb.set_trace()
              ssn = conn.session()
              snd = ssn.sender(addr)
      
              count = 0
              start = time.time()
              while (opts.count == 0 or count < opts.count) and \
                    (opts.timeout is None or time.time() - start < opts.timeout):
                  msg = Message(subject=opts.subject,
                                reply_to=opts.reply_to,
                                content=content)
                  if opts.durable:
                      msg.durable = True
      
                  if content_type is not None:
                      msg.content_type = content_type
      
                  msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
                  for p in opts.properties:
                      name, val = nameval(p)
                      msg.properties[name] = val
      
                  snd.send(msg)
                  count += 1
                  print msg
          except:
              print("booboo")
              os._exit(1)
          print("buh-bye")
          os._exit(0)
      else:
          status = os.wait()[1]
          #pdb.set_trace()
          conn.close()
          sys.exit(status)
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              bmbouter Brian Bouterse
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: