Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-15748

Paho consumer never connects if the broker is not reachable at startup

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.5.0, 3.4.4, 3.6.0
    • 3.4.5, 3.7.0
    • camel-paho
    • None
    • Unknown

    Description

      Having a route with a paho consumer in a route, if the broker is not reachable at startup, camel context fail fast and shuts down.

      This can be avoided by setting the SupervisingRouteController, but this way, even if camel context does not fail, the consumer is never able to establish a connection.

      The reason is that when PahoConsumer starts for the first time, it creates a MqttClient and stores it into client field; the call to client.connect throws an exception due to broker down;

       

       
          @Override
          protected void doStart() throws Exception {
              super.doStart();        connectOptions = PahoEndpoint.createMqttConnectOptions(getEndpoint().getConfiguration());        if (client == null) {
                  clientId = getEndpoint().getConfiguration().getClientId();
                  if (clientId == null) {
                      clientId = "camel-" + MqttClient.generateClientId();
                  }
                  stopClient = true;
                  client = new MqttClient(
                          getEndpoint().getConfiguration().getBrokerUrl(),
                          clientId,
                          PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
                  LOG.debug("Connecting client: {} to broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
                  client.connect(connectOptions);
              }
              
              // other code omitted for brevity
              
              client.subscribe(getEndpoint().getTopic(), getEndpoint().getConfiguration().getQos());

       

      after that doStop is invoked but the client instance is not nullified, because it is not connected

       

          @Override
          protected void doStop() throws Exception {
              super.doStop();        if (stopClient && client != null && client.isConnected()) {
                  String topic = getEndpoint().getTopic();
                  // only unsubscribe if we are not durable
                  if (getEndpoint().getConfiguration().isCleanSession()) {
                      LOG.debug("Unsubscribing client: {} from topic: {}", clientId, topic);
                      client.unsubscribe(topic);
                  } else {
                      LOG.debug("Client: {} is durable so will not unsubscribe from topic: {}", clientId, topic);
                  }
                  LOG.debug("Disconnecting client: {} from broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
                  client.disconnect();
                  client = null;
              }
          }
      

       

      when the supervisor tries to restart the route, client instance already exists, but the call to client.subscribe fails because the client is not connected.

      Perhaps always nullify client in doStop should resolve the issue; however I have no idea it this solution will impact in other ways.

      A better solution may be to handle automatic reconnect in the consumer, like RabbitConsumer does, for example.

      Attachments

        1. PahoConsumerRestartTest.java
          5 kB
          Marco Collovati

        Activity

          People

            Unassigned Unassigned
            mcollovati Marco Collovati
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: