Uploaded image for project: 'Qpid Proton'
  1. Qpid Proton
  2. PROTON-2306

Getting connection inactive error when consuming messages from Azure Service Bus

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • proton-c-0.31.0
    • None
    • python-binding
    • None
    • The service runs as 2 instances (pods) in a Kubernetes cluster hosted on Azure Kubernetes Service.

    Description

      Hi,

      We currently have a simple service that consumes messages from Azure Service Bus (indefinitely), and based on the contents of those messages, either ignores it or does a POST request to another internal service endpoint for our app. 

      However, occasionally we get the following exception:

      The connection was inactive for more than the allowed 240000 milliseconds and is closed by container '5942ad2288f8495aaea81fe91c935445_G10'.

      This is then shortly followed by:

      amqp:connection:framing-error: SSL Failure: Unknown error

      I've observed that this error happens pretty much exactly 4 minutes (i.e. 240000 ms) after the last time the service processed an incoming message. This suggests to me that it somehow after processing a message, suddenly stops sending heartbeats to the Azure Service Bus to keep the connection open.

       

      Any help or pointers in the right direction here would be much appreciated. I've been looking into this for several days without really getting anywhere.

       

      Below is what I believe to be the relevant code:

       

      class TriggerConsumer(MessagingHandler):
          def __init__(
              self,
              server: str,
              address: str,
              event_filter,
          ):
              super().__init__()
              self._server = server
              self._address = address
              self._event_filter = event_filter
          def on_start(self, event: Event):
              """Called when the event loop starts. See superclass docs for a (tiny) bit more info.
              We use it here to 'set up' our event receiver from the queue.
              """
              logging.info('Starting consumer...')
              # Source: https://www.johndehavilland.com/blog/2017/07/05/Python-ServiceBus-AMQP.html
              conn = event.container.connect(self._server, allowed_mechs='PLAIN')
              event.container.create_receiver(conn, self._address)
              logging.info('Started consumer')
              health_check.set_ready()
      
          def on_message(self, event: Event):
              logging.info('Receiving message')
              message_id = None
              try:
                  message_id = event.message.id
                  message_content = json.loads(event.message.body)
                  try:
                      self._event_filter.check_event(
                          event_body=message_content, last_processed_event_time=self._last_processing_time
                      )
                  except FilterError as e:
                      logging.info(f"Event with id '{message_id}' rejected by event filter: {e}")
                      return
      
                  # Do POST call to separate service in the cluster here
                  do_post_call()
      
                  logging.info(f'Successfully processed event with id {message_id}')
              # Message is not acked if we raise an exception, so to avoid a vicious loop of crashing on the same
              # message we catch all exceptions here.
              except Exception as e:
                  logging.error(f'Failed to process event with id {message_id}: {e}')
                  logging.exception(e)
                  return
      
      if __name__ == '__main__':
          logging.info('Starting service')
          # Spawns a separate thread with a simple HTTP server exposing health monitoring endpoints to Docker
          start_http_health_check_endpoint()
          consumer = TriggerConsumer(
              # AMQP connection string, usually in the form of 'amqps://{keyname}:{key}@{queue server name}.servicebus.windows.net'.
              server=Config.AMQP_SERVER_CONN,
              address=Config.QUEUE_NAME,
              event_filter=EventFilter()
          )
          Container(consumer).run()
      

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            mhusbynflex Martin Husbyn

            Dates

              Created:
              Updated:

              Slack

                Issue deployment