Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-7031

RabbitMQ Producer not able to use the default exchange

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.12.2
    • 2.12.3
    • camel-rabbitmq
    • None
    • Unknown

    Description

      In RabbitMQ, the default exchange is a direct exchange with no name (empty string) and is pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name. This is especially useful in RPC style messaging when the producer specifies a REPLY_TO queue name that was created "exclusive" Since RabbitMQ binds that queue onto the default exchange, it makes RPC much simpler.

      However, the camel rabbitmq producer throws an IllegalArgumentException if the exchange name is empty, which prevents this simple RPC exchange. The fix for this is simple, just don't throw that IllegalArgumentException if the exchange name was set to empty string.

      The same problem may exists with the Consumer as well.

      This python script will send an rpc request (from RabbitMQ in Action)

      import time, json, pika
      
      creds_broker = pika.PlainCredentials("guest", "guest")
      conn_params = pika.ConnectionParameters("localhost",
                                               virtual_host = "/",
                                               credentials = creds_broker)
      conn_broker = pika.BlockingConnection(conn_params)
      channel = conn_broker.channel()
      
      msg = json.dumps({"client_name": "RPC Client 1.0",
                        "time" : time.time()})
      result = channel.queue_declare(exclusive=True, auto_delete=True)
      msg_props = pika.BasicProperties()
      msg_props.reply_to = result.method.queue
      msg_props.content_type = "application/json"
      msg_props.correlation_id = "1"
      msg_props.delivery_mode = 2
      
      channel.basic_publish(body=msg,
                            exchange="rpc",
                            properties=msg_props,
                            routing_key="ping")
      
      print "Sent 'Ping' RPC call.  Waiting for reply..."
      
      def reply_callback(channel, method, header, body):
           """Receives RPC server replies."""
           print "RPC Reply --- " + body
           channel.stop_consuming()
      
      channel.basic_consume(reply_callback,
                            queue=result.method.queue,
                            consumer_tag=result.method.queue)
      
      channel.start_consuming()
      

      This route would be what I would want to do when consuming from Rabbit the rpc call and sending back a response:

              from("rabbitmq://192.168.213.130/rpc?queue=ping&routingKey=ping&durable=True&autoDelete=False&autoAck=False&username=guest&password=guest")
                  .log("Incoming Headers: ${headers}")
                  .setHeader("rabbitmq.ROUTING_KEY", header("rabbitmq.REPLY_TO"))
                  .removeHeader("rabbitmq.REPLY_TO")
                  .removeHeader("rabbitmq.EXCHANGE_NAME")
                  .setBody(simple("Pong!"))
                  .to("rabbitmq://192.168.213.130/?username=guest&password=guest");
      
      

      If I remove the illegalargumentexception, the code works as expected.

      Attachments

        Issue Links

          Activity

            People

              njiang Willem Jiang
              jrfoster67 Jason Foster
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: