Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-5214

[C++ broker] Memory leak in legacystore when raising RHM_IORES_ENQCAPTHRESH



    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.24
    • Fix Version/s: None
    • Component/s: C++ Broker
    • Labels:


      There is a memory leak when legacystore raises RHM_IORES_ENQCAPTHRESH: "Enqueue capacity threshold exceeded on queue ..". For reproducer, let try to send durable messages to a tiny journal queue in a loop.

      Valgrind showed me:

      ==632== 2,288 (208 direct, 2,080 indirect) bytes in 2 blocks are definitely lost in loss record 115 of 116
      ==632== at 0x4A075BC: operator new(unsigned long) (vg_replace_malloc.c:298)
      ==632== by 0x60D76AB: mrg::msgstore::MessageStoreImpl::store(qpid::broker::PersistableQueue const*, mrg::msgstore::TxnCtxt*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, bool) (in /data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
      ==632== by 0x60D7165: mrg::msgstore::MessageStoreImpl::enqueue(qpid::broker::TransactionContext*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, qpid::broker::PersistableQueue const&) (in /data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
      ==632== by 0x5023568: qpid::broker::MessageStoreModule::enqueue(qpid::broker::TransactionContext*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, qpid::broker::PersistableQueue const&) (in /data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)
      ==632== by 0x4F9BAC8: qpid::broker::Queue::enqueue(qpid::broker::TransactionContext*, qpid::broker::Message&) (in /data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)

      Some further debugging showed the line with "new" call is:

      void MessageStoreImpl::store(..
      if (queue) {
      boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);

      I tried to fix the leak, but I see nothing wrong in code that could trigger it. As:
      1) dtokp is a local variable declared there, while its content is not copied or referenced anywhere later on
      2) even catching StoreException and explicitly calling "dtokp->reset(); dtokp = boost::intrusive_ptr<DataTokenImpl>();" does not prevent the mem.leak

      What exactly is executed at the time RHM_IORES_ENQCAPTHRESH to be raised within MessageStoreImpl::store call:

      1) ./lib/MessageStoreImpl.cpp:

      boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
      dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)

      JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
      if (txn->getXid().empty()) {
      if (message->isContentReleased())

      { jc->enqueue_extern_data_record(size, dtokp.get(), !message->isPersistent()); }


      { jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent()); }

      2) enqueue_data_record called from:

      JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
      const size_t this_data_len, data_tok* dtokp, const bool transient)
      handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));

      3) nested enqueue_data_record called from:
      jcntl::enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
      const std::size_t this_data_len, data_tok* dtokp, const bool transient)

      while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
      dtokp)) ;

      4) _wmgr.enqueue called from:
      wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
      const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
      const std::size_t xid_len, const bool transient, const bool external)

      iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
      if (res != RHM_IORES_SUCCESS)
      return res;

      5) pre_write_check called from ./lib/jrnl/wmgr.cpp as well:

      wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
      const std::size_t xidsize, const std::size_t dsize, const bool external
      ) const

      if (!_wrfc.is_wr_reset())

      { if (!_wrfc.wr_reset()) return RHM_IORES_FULL; }

      // Check status of current page is ok for writing
      if (_page_cb_arr[_pg_index]._state != IN_USE)

      { if (_page_cb_arr[_pg_index]._state == UNUSED) _page_cb_arr[_pg_index]._state = IN_USE; }

      switch (op)
      case WMGR_ENQUEUE:
      // Check for enqueue reaching cutoff threshold
      u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
      if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))

      6) return to 4, there return to 3, and execute: handle_aio_wait:
      jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)

      (return false)

      7) return to 2, there return value used in:
      JournalImpl::handleIoResult(const iores r):
      writeActivityFlag = true;
      switch (r)
      case mrg::journal::RHM_IORES_SUCCESS:
      case mrg::journal::RHM_IORES_ENQCAPTHRESH:

      { std::ostringstream oss; oss << "Enqueue capacity threshold exceeded on queue \"" << _jid << "\"."; log(LOG_WARN, oss.str()); if (_agent != 0) _agent->raiseEvent(qmf::com::redhat::rhm::store::EventEnqThresholdExceeded(_jid, "Journal enqueue capacity threshold exceeded"), qpid::management::ManagementAgent::SEV_WARN); THROW_STORE_FULL_EXCEPTION(oss.str()); }

      Simply, no data_tok* object used is copied/referenced/whatever in a manner it could prevent freeing the memory.

      Anyway, the memory leak is there, the trivial reproducer causes the broker to consume more and more memory (checked by "ps" and "qpid-stat -m" commands outputs).




            • Assignee:
              pmoravec Pavel Moravec
            • Votes:
              0 Vote for this issue
              1 Start watching this issue


              • Created: