Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
0.24
-
None
Description
There is a memory leak when legacystore raises RHM_IORES_ENQCAPTHRESH: "Enqueue capacity threshold exceeded on queue ..". For reproducer, let try to send durable messages to a tiny journal queue in a loop.
Valgrind showed me:
==632== 2,288 (208 direct, 2,080 indirect) bytes in 2 blocks are definitely lost in loss record 115 of 116
==632== at 0x4A075BC: operator new(unsigned long) (vg_replace_malloc.c:298)
==632== by 0x60D76AB: mrg::msgstore::MessageStoreImpl::store(qpid::broker::PersistableQueue const*, mrg::msgstore::TxnCtxt*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, bool) (in /data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
==632== by 0x60D7165: mrg::msgstore::MessageStoreImpl::enqueue(qpid::broker::TransactionContext*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, qpid::broker::PersistableQueue const&) (in /data_xfs/qpid-trunk/cpp/BLD/src/legacystore.so)
==632== by 0x5023568: qpid::broker::MessageStoreModule::enqueue(qpid::broker::TransactionContext*, boost::intrusive_ptr<qpid::broker::PersistableMessage> const&, qpid::broker::PersistableQueue const&) (in /data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)
==632== by 0x4F9BAC8: qpid::broker::Queue::enqueue(qpid::broker::TransactionContext*, qpid::broker::Message&) (in /data_xfs/qpid-trunk/cpp/BLD/src/libqpidbroker.so.2.0.0)
Some further debugging showed the line with "new" call is:
void MessageStoreImpl::store(..
..
if (queue) {
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
..
I tried to fix the leak, but I see nothing wrong in code that could trigger it. As:
1) dtokp is a local variable declared there, while its content is not copied or referenced anywhere later on
2) even catching StoreException and explicitly calling "dtokp->reset(); dtokp = boost::intrusive_ptr<DataTokenImpl>();" does not prevent the mem.leak
What exactly is executed at the time RHM_IORES_ENQCAPTHRESH to be raised within MessageStoreImpl::store call:
1) ./lib/MessageStoreImpl.cpp:
MessageStoreImpl::store
boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
dtokp->addRef();
dtokp->setSourceMessage(message);
dtokp->set_external_rid(true);
dtokp->set_rid(message->getPersistenceId()); // set the messageID into the Journal header (record-id)
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
if (txn->getXid().empty()) {
if (message->isContentReleased())
else
{ jc->enqueue_data_record(&buff[0], size, size, dtokp.get(), !message->isPersistent()); }2) enqueue_data_record called from:
./lib/JournalImpl.cpp
JournalImpl::enqueue_data_record
JournalImpl::enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
const size_t this_data_len, data_tok* dtokp, const bool transient)
{
handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
3) nested enqueue_data_record called from:
./lib/jrnl/jcntl.cpp:
jcntl::enqueue_data_record(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const bool transient)
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
dtokp)) ;
4) _wmgr.enqueue called from:
./lib/jrnl/wmgr.cpp:
wmgr::enqueue(const void* const data_buff, const std::size_t tot_data_len,
const std::size_t this_data_len, data_tok* dtokp, const void* const xid_ptr,
const std::size_t xid_len, const bool transient, const bool external)
iores res = pre_write_check(WMGR_ENQUEUE, dtokp, xid_len, tot_data_len, external);
if (res != RHM_IORES_SUCCESS)
return res;
5) pre_write_check called from ./lib/jrnl/wmgr.cpp as well:
wmgr::pre_write_check(const _op_type op, const data_tok* const dtokp,
const std::size_t xidsize, const std::size_t dsize, const bool external
) const
if (!_wrfc.is_wr_reset())
{ if (!_wrfc.wr_reset()) return RHM_IORES_FULL; } // Check status of current page is ok for writing
if (_page_cb_arr[_pg_index]._state != IN_USE)
switch (op)
{
case WMGR_ENQUEUE:
{
// Check for enqueue reaching cutoff threshold
u_int32_t size_dblks = jrec::size_dblks(enq_rec::rec_size(xidsize, dsize,
external));
if (!_enq_busy && _wrfc.enq_threshold(_cached_offset_dblks + size_dblks))
return RHM_IORES_ENQCAPTHRESH;
6) return to 4, there return to 3, and execute: handle_aio_wait:
./lib/jrnl/jcntl.cpp:
jcntl::handle_aio_wait(const iores res, iores& resout, const data_tok* dtp)
(return false)
7) return to 2, there return value used in:
./lib/JournalImpl.cpp:
JournalImpl::handleIoResult(const iores r):
writeActivityFlag = true;
switch (r)
{
case mrg::journal::RHM_IORES_SUCCESS:
return;
case mrg::journal::RHM_IORES_ENQCAPTHRESH:
Simply, no data_tok* object used is copied/referenced/whatever in a manner it could prevent freeing the memory.
Anyway, the memory leak is there, the trivial reproducer causes the broker to consume more and more memory (checked by "ps" and "qpid-stat -m" commands outputs).
Attachments
Issue Links
- is related to
-
QPID-5641 [legacystore] Valgrind reports memory leaks on older store tests
- Closed