Uploaded image for project: 'ActiveMQ'
  1. ActiveMQ
  2. AMQ-7350

Inconsistency in Offline Durable Topic Subscribers

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 5.15.10
    • Fix Version/s: None
    • Component/s: AMQP, Broker
    • Labels:
      None
    • Flags:
      Important

      Description

      Hello All, 

      • I have a issue with my activemq setup with durable subscription
      • My setup includes a broker with topics and the backend that durably subscribes to these topics.
      • Random mapping of durable subscribers happens on each restart. In order to fix it, I always have to wipe the persistence
      • All the topics are created via activemq.xml
      • With the usage of kahadb/mkahadb, I want to enable persistence as well. In doing so, after the restart of my broker i get inconsistency in my offline durable topic susbcribers.

      I am also attaching the config used by me. As well as an image to describe the issue.
      Config with kahadb

      <beans
          xmlns="http://www.springframework.org/schema/beans"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      			http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
      
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
          <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
          </property>
        </bean>
      
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="MQ-MY-BROKER-NAME" dataDirectory="${activemq.data}" schedulerSupport="false" persistent="true">
          <destinations>
      
          <topic physicalName="Hello.V3"/>
      
          <topic physicalName="Hello.V2"/>
      
          <topic physicalName="Hello.V1"/>
      
        </destinations>
          <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
                  <deadLetterStrategy>
                    <individualDeadLetterStrategy queueSuffix=".DLQ" useQueueForQueueMessages="true"/>
                  </deadLetterStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
          </destinationPolicy>
      
          <managementContext>
            <managementContext createConnector="true" connectorPort="1199"/>
          </managementContext>
          
          <persistenceAdapter>
            <kahaDB directory="${activemq.data}" journalMaxFileLength="32mb"/>
          </persistenceAdapter>
      
          <!-- Resource limits for triggering producer flow control -->
          <systemUsage>
            <systemUsage>
              <memoryUsage>
                <memoryUsage percentOfJvmHeap="70"/>
              </memoryUsage>
              <storeUsage>
                <storeUsage limit="10 gb"/>
              </storeUsage>
              <tempUsage>
                <tempUsage limit="1 gb"/>
              </tempUsage>
            </systemUsage>
          </systemUsage>
      
      
          <!-- DOS protection: limit concurrent connections to 1000 and frame size to 100MB -->
          <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61612?maximumConnections=1000&amp;transport.closeAsync=false&amp;connectionTimeout=120000&amp;wireformat.maxFrameSize=104857600"/>
          </transportConnectors>
      
          <!-- destroy the spring context on shutdown to stop jetty and camel -->
          <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
          </shutdownHooks>
        </broker>
        <!-- Enable web consoles, REST and Ajax APIs and demos -->
        <import resource="jetty.xml"/>
      </beans>
      

      Config with mkahadb

      <beans
          xmlns="http://www.springframework.org/schema/beans"
          xmlns:amq="http://activemq.apache.org/schema/core"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      			http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
      
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
          <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
          </property>
        </bean>
      
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="MQ-MY-BROKER-NAME" dataDirectory="${activemq.data}" schedulerSupport="false" persistent="true">
          <destinations>
      
          <topic physicalName="Hello.V3"/>
      
          <topic physicalName="Hello.V2"/>
      
          <topic physicalName="Hello.V1"/>
      
        </destinations>
          <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb">
                  <deadLetterStrategy>
                    <individualDeadLetterStrategy queueSuffix=".DLQ" useQueueForQueueMessages="true"/>
                  </deadLetterStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
          </destinationPolicy>
      
          <managementContext>
            <managementContext createConnector="true" connectorPort="1199"/>
          </managementContext>
          
          <persistenceAdapter>
            <mKahaDB directory="${activemq.data}/mkahadb">
              <filteredPersistenceAdapters>
                <filteredKahaDB perDestination="true">
                  <persistenceAdapter>
                    <kahaDB ignoreMissingJournalfiles="false" checkForCorruptJournalFiles="true" checksumJournalFiles="true"/>
                  </persistenceAdapter>
                </filteredKahaDB>
              </filteredPersistenceAdapters>
            </mKahaDB>
          </persistenceAdapter>
      
          <!-- Resource limits for triggering producer flow control -->
          <systemUsage>
            <systemUsage>
              <memoryUsage>
                <memoryUsage percentOfJvmHeap="70"/>
              </memoryUsage>
              <storeUsage>
                <storeUsage limit="10 gb"/>
              </storeUsage>
              <tempUsage>
                <tempUsage limit="1 gb"/>
              </tempUsage>
            </systemUsage>
          </systemUsage>
      
      
          <!-- DOS protection: limit concurrent connections to 1000 and frame size to 100MB -->
          <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61612?maximumConnections=1000&amp;transport.closeAsync=false&amp;connectionTimeout=120000&amp;wireformat.maxFrameSize=104857600"/>
          </transportConnectors>
      
          <!-- destroy the spring context on shutdown to stop jetty and camel -->
          <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
          </shutdownHooks>
        </broker>
        <!-- Enable web consoles, REST and Ajax APIs and demos -->
        <import resource="jetty.xml"/>
      </beans>
      

       

      Activemq community forum link : http://activemq.2283324.n4.nabble.com/offline-Durable-Topic-Subscribers-issue-in-activemq-5-15-8-td4753263.html

        Attachments

        1. image-2020-04-01-16-27-26-201.png
          475 kB
          Ashish
        2. image-2020-04-01-16-30-27-209.png
          443 kB
          Ashish
        3. offline_topic_durable_subscriber.png
          37 kB
          Ashish
        4. setup.tar
          380 kB
          Ashish

          Activity

            People

            • Assignee:
              jbonofre Jean-Baptiste Onofré
              Reporter:
              vaishno.avi Ashish
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated: