ActiveMQ
  1. ActiveMQ
  2. AMQ-3844

NullPointerException when removing connection info

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 5.4.3, 5.5.1
    • Fix Version/s: 5.6.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Linux 2.6.18-274.12.1.el5 #1 SMP Tue Nov 29 13:37:35 EST 2011 i686 athlon i386 GNU/Linux, Java(TM) SE Runtime Environment (build 1.6.0_29-b11)

      Description

      Sometimes off and on the ActiveMQ server, the KahaDB maintains some old transactions that try to recovery. So at start up these transactions are added to the xaTransactions map inside TransactionBroker with a null ConnectionId.

      This is the stack trace of the recovery at startup:

       
      	TransactionBroker.beginTransaction(ConnectionContext, TransactionId) line: 152	
      	TransactionBroker$1.recover(XATransactionId, Message[], MessageAck[]) line: 92	
      	KahaDBTransactionStore.recover(TransactionRecoveryListener) line: 317	
      	TransactionBroker.start() line: 89	
      	BrokerService$3.start() line: 1781	
      	XBeanBrokerService(BrokerService).start() line: 489	
      	XBeanBrokerService.afterPropertiesSet() line: 60	
      	NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]	
      	NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39	
      	DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25	
      	Method.invoke(Object, Object...) line: 597	
      	DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeCustomInitMethod(String, Object, RootBeanDefinition) line: 1536	
      	DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeInitMethods(String, Object, RootBeanDefinition) line: 1477	
      	DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).initializeBean(String, Object, RootBeanDefinition) line: 1409	
      	DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 519	
      	DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 456	
      	AbstractBeanFactory$1.getObject() line: 291	
      	DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory) line: 222	
      	DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 288	
      	DefaultListableBeanFactory(AbstractBeanFactory).getBean(String) line: 190	
      	DefaultListableBeanFactory.preInstantiateSingletons() line: 574	
      	XBeanBrokerFactory$1(AbstractApplicationContext).finishBeanFactoryInitialization(ConfigurableListableBeanFactory) line: 895	
      	XBeanBrokerFactory$1(AbstractApplicationContext).refresh() line: 425	
      	XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource, List) line: 64	
      	XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource) line: 52	
      	XBeanBrokerFactory$1.<init>(XBeanBrokerFactory, Resource) line: 115	
      	XBeanBrokerFactory.createApplicationContext(String) line: 115	
      	XBeanBrokerFactory.createBroker(URI) line: 71	
      	BrokerFactory.createBroker(URI, boolean) line: 71	
      	BrokerFactory.createBroker(URI) line: 54	
      	StartCommand.startBroker(URI) line: 115	
      	StartCommand.runTask(List<String>) line: 74	
      	StartCommand(AbstractCommand).execute(List<String>) line: 57	
      	ShellCommand.runTask(List<String>) line: 143	
      	ShellCommand(AbstractCommand).execute(List<String>) line: 57	
      	ShellCommand.main(String[], InputStream, PrintStream) line: 85	
      	NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]	
      	NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39	
      	DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25	
      	Method.invoke(Object, Object...) line: 597	
      	Main.runTaskClass(List<String>) line: 251	
      	Main.main(String[]) line: 107	 
      

      During the runtime the client tries to add and remove connections; sometimes the removeConnection throws a NPE due to these transactions without ConnectionID.
      Take a look to the code fragment from TransactionBroker:

          public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
              for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
                  try {
                      Transaction transaction = iter.next();
                      transaction.rollback();
                  } catch (Exception e) {
                      LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
                  }
                  iter.remove();
              }
      
              synchronized (xaTransactions) {
                  // first find all txs that belongs to the connection
                  ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
                  for (XATransaction tx : xaTransactions.values()) {
                      if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
                          txs.add(tx);
                      }
                  }
      
                  // then remove them
                  // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
                  for (XATransaction tx : txs) {
                      try {
                          tx.rollback();
                      } catch (Exception e) {
                          LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
                      }
                  }
      
              }
              next.removeConnection(context, info, error);
          }
      

      as you can see inside the loop there is a check for tx.getConnectionId().equals(info.getConnectionId()) that throws the NPE. When this occurs the connection isn't removed. This information isn't shared with the client that believes the opposite, so the next time that try to resend client information to server obtain (under jBoss) this error javax.transaction.xa.XAException: Broker: AMQ - Client: ID:srv001-47592-1336730655955-64:2 already connected from /127.0.0.1:49806
      that can be bound to the former server fails.
      This scenario can be found inside the attached logs.

      1. jBoss_server.log
        30 kB
        Antonio D'Errico
      2. ActiveMQ_server.log
        7 kB
        Antonio D'Errico

        Activity

        Antonio D'Errico created issue -
        Antonio D'Errico made changes -
        Field Original Value New Value
        Description Sometimes off and on the KahaDB maintains some old transactions that try to recovery. So at start up these transactions are added to the {{xaTransactions}} inside {{TransactionBroker}} with a null {{ConnectionId}}.

        This is the stack trace of the recovery
        {code:none}
        TransactionBroker.beginTransaction(ConnectionContext, TransactionId) line: 152
        TransactionBroker$1.recover(XATransactionId, Message[], MessageAck[]) line: 92
        KahaDBTransactionStore.recover(TransactionRecoveryListener) line: 317
        TransactionBroker.start() line: 89
        BrokerService$3.start() line: 1781
        XBeanBrokerService(BrokerService).start() line: 489
        XBeanBrokerService.afterPropertiesSet() line: 60
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
        Method.invoke(Object, Object...) line: 597
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeCustomInitMethod(String, Object, RootBeanDefinition) line: 1536
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeInitMethods(String, Object, RootBeanDefinition) line: 1477
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).initializeBean(String, Object, RootBeanDefinition) line: 1409
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 519
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 456
        AbstractBeanFactory$1.getObject() line: 291
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory) line: 222
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 288
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String) line: 190
        DefaultListableBeanFactory.preInstantiateSingletons() line: 574
        XBeanBrokerFactory$1(AbstractApplicationContext).finishBeanFactoryInitialization(ConfigurableListableBeanFactory) line: 895
        XBeanBrokerFactory$1(AbstractApplicationContext).refresh() line: 425
        XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource, List) line: 64
        XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource) line: 52
        XBeanBrokerFactory$1.<init>(XBeanBrokerFactory, Resource) line: 115
        XBeanBrokerFactory.createApplicationContext(String) line: 115
        XBeanBrokerFactory.createBroker(URI) line: 71
        BrokerFactory.createBroker(URI, boolean) line: 71
        BrokerFactory.createBroker(URI) line: 54
        StartCommand.startBroker(URI) line: 115
        StartCommand.runTask(List<String>) line: 74
        StartCommand(AbstractCommand).execute(List<String>) line: 57
        ShellCommand.runTask(List<String>) line: 143
        ShellCommand(AbstractCommand).execute(List<String>) line: 57
        ShellCommand.main(String[], InputStream, PrintStream) line: 85
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
        Method.invoke(Object, Object...) line: 597
        Main.runTaskClass(List<String>) line: 251
        Main.main(String[]) line: 107
        {code}

        after during execution the client add and remove connection and sometimes the removeConnection throws a NPE because there are these transactions without ConnectionID

        {code:java}
            public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
                for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
                    try {
                        Transaction transaction = iter.next();
                        transaction.rollback();
                    } catch (Exception e) {
                        LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
                    }
                    iter.remove();
                }

                synchronized (xaTransactions) {
                    // first find all txs that belongs to the connection
                    ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
                    for (XATransaction tx : xaTransactions.values()) {
                        if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
                            txs.add(tx);
                        }
                    }

                    // then remove them
                    // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
                    for (XATransaction tx : txs) {
                        try {
                            tx.rollback();
                        } catch (Exception e) {
                            LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
                        }
                    }

                }
                next.removeConnection(context, info, error);
            }
        {code}

        as you can see inside the loop there is a check for {{tx.getConnectionId().equals(info.getConnectionId())}} that throws the NPE so the connection isn't removed but this information isn't shared with the client that the next time that try to resend client information to server obtain (under jBoss) this error {{javax.transaction.xa.XAException: Broker: AMQ - Client: ID:srv001-47592-1336730655955-64:2 already connected from /127.0.0.1:49806}}
        that can be bound to the server fails.

        Sometimes off and on the ActiveMQ server, the KahaDB maintains some old transactions that try to recovery. So at start up these transactions are added to the {{xaTransactions}} map inside {{TransactionBroker}} with a null {{ConnectionId}}.

        This is the stack trace of the recovery at startup:

        {code:none}
        TransactionBroker.beginTransaction(ConnectionContext, TransactionId) line: 152
        TransactionBroker$1.recover(XATransactionId, Message[], MessageAck[]) line: 92
        KahaDBTransactionStore.recover(TransactionRecoveryListener) line: 317
        TransactionBroker.start() line: 89
        BrokerService$3.start() line: 1781
        XBeanBrokerService(BrokerService).start() line: 489
        XBeanBrokerService.afterPropertiesSet() line: 60
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
        Method.invoke(Object, Object...) line: 597
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeCustomInitMethod(String, Object, RootBeanDefinition) line: 1536
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).invokeInitMethods(String, Object, RootBeanDefinition) line: 1477
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).initializeBean(String, Object, RootBeanDefinition) line: 1409
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).doCreateBean(String, RootBeanDefinition, Object[]) line: 519
        DefaultListableBeanFactory(AbstractAutowireCapableBeanFactory).createBean(String, RootBeanDefinition, Object[]) line: 456
        AbstractBeanFactory$1.getObject() line: 291
        DefaultListableBeanFactory(DefaultSingletonBeanRegistry).getSingleton(String, ObjectFactory) line: 222
        DefaultListableBeanFactory(AbstractBeanFactory).doGetBean(String, Class<T>, Object[], boolean) line: 288
        DefaultListableBeanFactory(AbstractBeanFactory).getBean(String) line: 190
        DefaultListableBeanFactory.preInstantiateSingletons() line: 574
        XBeanBrokerFactory$1(AbstractApplicationContext).finishBeanFactoryInitialization(ConfigurableListableBeanFactory) line: 895
        XBeanBrokerFactory$1(AbstractApplicationContext).refresh() line: 425
        XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource, List) line: 64
        XBeanBrokerFactory$1(ResourceXmlApplicationContext).<init>(Resource) line: 52
        XBeanBrokerFactory$1.<init>(XBeanBrokerFactory, Resource) line: 115
        XBeanBrokerFactory.createApplicationContext(String) line: 115
        XBeanBrokerFactory.createBroker(URI) line: 71
        BrokerFactory.createBroker(URI, boolean) line: 71
        BrokerFactory.createBroker(URI) line: 54
        StartCommand.startBroker(URI) line: 115
        StartCommand.runTask(List<String>) line: 74
        StartCommand(AbstractCommand).execute(List<String>) line: 57
        ShellCommand.runTask(List<String>) line: 143
        ShellCommand(AbstractCommand).execute(List<String>) line: 57
        ShellCommand.main(String[], InputStream, PrintStream) line: 85
        NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
        NativeMethodAccessorImpl.invoke(Object, Object[]) line: 39
        DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 25
        Method.invoke(Object, Object...) line: 597
        Main.runTaskClass(List<String>) line: 251
        Main.main(String[]) line: 107
        {code}

        During the runtime the client tries to add and remove connections; sometimes the removeConnection throws a NPE due to these transactions without ConnectionID.
        Take a look to the code fragment from {{TransactionBroker}}:

        {code:java}
            public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
                for (Iterator<Transaction> iter = context.getTransactions().values().iterator(); iter.hasNext();) {
                    try {
                        Transaction transaction = iter.next();
                        transaction.rollback();
                    } catch (Exception e) {
                        LOG.warn("ERROR Rolling back disconnected client's transactions: ", e);
                    }
                    iter.remove();
                }

                synchronized (xaTransactions) {
                    // first find all txs that belongs to the connection
                    ArrayList<XATransaction> txs = new ArrayList<XATransaction>();
                    for (XATransaction tx : xaTransactions.values()) {
                        if (tx.getConnectionId().equals(info.getConnectionId()) && !tx.isPrepared()) {
                            txs.add(tx);
                        }
                    }

                    // then remove them
                    // two steps needed to avoid ConcurrentModificationException, from removeTransaction()
                    for (XATransaction tx : txs) {
                        try {
                            tx.rollback();
                        } catch (Exception e) {
                            LOG.warn("ERROR Rolling back disconnected client's xa transactions: ", e);
                        }
                    }

                }
                next.removeConnection(context, info, error);
            }
        {code}

        as you can see inside the loop there is a check for {{tx.getConnectionId().equals(info.getConnectionId())}} that throws the NPE. When this occurs the connection isn't removed. This information isn't shared with the client that believes the opposite, so the next time that try to resend client information to server obtain (under jBoss) this error {{javax.transaction.xa.XAException: Broker: AMQ - Client: ID:srv001-47592-1336730655955-64:2 already connected from /127.0.0.1:49806}}
        that can be bound to the former server fails.
        This scenario can be found inside the attached logs.
        Antonio D'Errico made changes -
        Attachment ActiveMQ_server.log [ 12527602 ]
        Attachment jBoss_server.log [ 12527603 ]
        Gary Tully made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Assignee Gary Tully [ gtully ]
        Fix Version/s 5.6.0 [ 12317974 ]
        Resolution Fixed [ 1 ]

          People

          • Assignee:
            Gary Tully
            Reporter:
            Antonio D'Errico
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development