Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-4489

Newly received messages with higher priority are never consumed, until broker is restarted

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 5.5.1
    • 5.11.0
    • Broker, Message Store
    • None
    • ServiceMix 4.4.2, using Camel producers/consumers

    Description

      We configured message prioritization according to the following page :
      http://activemq.apache.org/how-can-i-support-priority-queues.html
      We use a JDBC adapter for message persistence, in an Oracle database.
      Prioritisation is enabled on the queue with the "prioritizedMessages" option, and we also specify a memory limit for the queue (24 MB)
      We use ActiveMQ 5.5.1 within ServiceMix 4.4.2, and use Camel JMS producers/consumers.
      Message can have 2 priorities : 4 (normal) for non-business hours and 9 (high) for business hours.

      The scenario to reproduce the problem is the following :
      1. Enqueue 1000 "normal" and 1000 "high" messages.
      2. All "high" messages are consumed first.
      3. After a few "normal" messages are consumed, enqueue additional 1000 "high" messages.
      4. All "normal" messages" are consumed before "high" messages.
      5. All additional "high" 1000 messages are never consumed.
      6. Restart broker.
      7. All additional "high" 1000 messages start getting consumed.

      In production, we have a producer with high peaks during the night (10,000-100,000 messages/hour), and 6 consumers (about 5,000-10,000 messages/hour), so the queue can reach 100,000-200,000 messages at some periods of the day. Messages are small (200 bytes).

      We enabled SQL query tracing on the broker (with log4jdbc), and we see that the logic with which the "findNextMessagesByPriorityStatement" query is called does not seem correct in the "JDBCMessageStore.recoverNextMessages" method :

      At step 2, we see the following query being executed :
      SELECT ID, MSG FROM ACTIVEMQ_MSGS WHERE CONTAINER='priorityQueue' AND ((ID > 200 AND PRIORITY = 9) OR PRIORITY < 9) ORDER BY PRIORITY DESC, ID

      At step 4, we see the following query being executed :
      SELECT ID, MSG FROM ACTIVEMQ_MSGS WHERE CONTAINER='priorityQueue' AND ((ID > 1200 AND PRIORITY = 4) OR PRIORITY < 4) ORDER BY PRIORITY DESC, ID

      The problem is that the value for the last priority stored in the "lastRecoveredPriority" variable of the JDBCMessageStore stays permanently to 4, until step 6, where it is reset to 9.

      We tried changing the priority to constant '9' in the query. It works OK until step 3, where only 200 messages are consumed

      Our understanding is that there should be one "lastRecoveredSequenceId" variable for each priority level, so that the last "consumed message but not yet removed from the DB" is memorized, and also the priority should probably also be reset to 9 every time the query is executed.

      Can you have a look please ?

      Attachments

        1. MessagePriorityTest.java
          20 kB
          metatech
        2. MessagePriorityTest_workaround.java
          21 kB
          metatech
        3. MessagePriorityTest_frozen.java
          21 kB
          metatech

        Issue Links

          Activity

            People

              Unassigned Unassigned
              metatech metatech
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: