Uploaded image for project: 'Camel'
  1. Camel
  2. CAMEL-15748

Paho consumer never connects if the broker is not reachable at startup

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.5.0, 3.4.4, 3.6.0
    • Fix Version/s: 3.4.5, 3.7.0
    • Component/s: camel-paho
    • Labels:
      None
    • Estimated Complexity:
      Unknown

      Description

      Having a route with a paho consumer in a route, if the broker is not reachable at startup, camel context fail fast and shuts down.

      This can be avoided by setting the SupervisingRouteController, but this way, even if camel context does not fail, the consumer is never able to establish a connection.

      The reason is that when PahoConsumer starts for the first time, it creates a MqttClient and stores it into client field; the call to client.connect throws an exception due to broker down;

       

       
          @Override
          protected void doStart() throws Exception {
              super.doStart();        connectOptions = PahoEndpoint.createMqttConnectOptions(getEndpoint().getConfiguration());        if (client == null) {
                  clientId = getEndpoint().getConfiguration().getClientId();
                  if (clientId == null) {
                      clientId = "camel-" + MqttClient.generateClientId();
                  }
                  stopClient = true;
                  client = new MqttClient(
                          getEndpoint().getConfiguration().getBrokerUrl(),
                          clientId,
                          PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
                  LOG.debug("Connecting client: {} to broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
                  client.connect(connectOptions);
              }
              
              // other code omitted for brevity
              
              client.subscribe(getEndpoint().getTopic(), getEndpoint().getConfiguration().getQos());

       

      after that doStop is invoked but the client instance is not nullified, because it is not connected

       

          @Override
          protected void doStop() throws Exception {
              super.doStop();        if (stopClient && client != null && client.isConnected()) {
                  String topic = getEndpoint().getTopic();
                  // only unsubscribe if we are not durable
                  if (getEndpoint().getConfiguration().isCleanSession()) {
                      LOG.debug("Unsubscribing client: {} from topic: {}", clientId, topic);
                      client.unsubscribe(topic);
                  } else {
                      LOG.debug("Client: {} is durable so will not unsubscribe from topic: {}", clientId, topic);
                  }
                  LOG.debug("Disconnecting client: {} from broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
                  client.disconnect();
                  client = null;
              }
          }
      

       

      when the supervisor tries to restart the route, client instance already exists, but the call to client.subscribe fails because the client is not connected.

      Perhaps always nullify client in doStop should resolve the issue; however I have no idea it this solution will impact in other ways.

      A better solution may be to handle automatic reconnect in the consumer, like RabbitConsumer does, for example.

        Attachments

        1. PahoConsumerRestartTest.java
          5 kB
          Marco Collovati

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              mcollovati Marco Collovati
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: