Details
Description
If a CMS consumer client reconnects frequently a segv will arise with reliability (it's guaranteed for us within a hundred reconnections or so). Additionally, in researching this we seem to have stumbled across a related memory leak.
When we shut down our consumer in the following order, as suggested by the sample code:
- delete destination
- delete consumer
- close session
- close connection
- delete session
- delete connection
...a segv will occur at the step marked with a , due to effects observable in these thread stack traces:
Thread #1:
> activemq-cppd.dll!activemq::transport::IOTransport::close() Line 158 C++
activemq-cppd.dll!activemq::transport::TransportFilter::close() Line 222 + 0x1b bytes C++
activemq-cppd.dll!activemq::transport::filters::TcpTransport::close() Line 147 + 0x8 bytes C++
activemq-cppd.dll!activemq::transport::filters::ResponseCorrelator::close() Line 290 + 0x19 bytes C++
activemq-cppd.dll!activemq::connector::openwire::OpenWireFormatNegotiator::close() Line 257 + 0x1b bytes C++
activemq-cppd.dll!activemq::connector::openwire::OpenWireConnector::close() Line 178 + 0x19 bytes C++
activemq-cppd.dll!activemq::core::ActiveMQConnectionData::close() Line 89 + 0x19 bytes C++
activemq-cppd.dll!activemq::core::ActiveMQConnectionData::~ActiveMQConnectionData() Line 77 C++
activemq-cppd.dll!activemq::core::ActiveMQConnectionData::`vector deleting destructor'() + 0x54 bytes C++
activemq-cppd.dll!activemq::core::ActiveMQConnection::close() Line 160 + 0x37 bytes C++
Thread #2:
activemq-cppd.dll!decaf::net::SocketInputStream::read(unsigned char * buffer=0x08b4dcb8, unsigned int offset=0, unsigned int bufferSize=1) Line 170 + 0x18 bytes C++
> activemq-cppd.dll!decaf::io::BufferedInputStream::bufferData() Line 260 + 0x30 bytes C++
activemq-cppd.dll!decaf::io::BufferedInputStream::read(unsigned char * targetBuffer=0x0a80f7d8, unsigned int offset=0, unsigned int targetBufferSize=4) Line 181 + 0x8 bytes C++
activemq-cppd.dll!decaf::io::DataInputStream::readAllData(unsigned char * buffer=0x0a80f7d8, unsigned int length=4) Line 360 + 0x25 bytes C++
activemq-cppd.dll!decaf::io::DataInputStream::readInt() Line 167 C++
activemq-cppd.dll!activemq::connector::openwire::OpenWireFormat::unmarshal(decaf::io::DataInputStream * dis=0x08bd66d8) Line 227 + 0xf bytes C++
activemq-cppd.dll!activemq::connector::openwire::OpenWireCommandReader::readCommand() Line 71 + 0x1c bytes C++
activemq-cppd.dll!activemq::transport::IOTransport::run() Line 188 + 0x15 bytes C++
activemq-cppd.dll!decaf::lang::Thread::runCallback(apr_thread_t * self=0x08bb3ea0, void * param=0x08bd6828) Line 113 + 0x15 bytes C++
libapr-1.dll!dummy_worker(void * opaque=0x08bb3ea0) Line 80 C
...where, in thread #2, the SocketInputStream interior to the BufferedInputStream (the inputStream member) has been NULL'ed between two method calls shown below, ending at bufferedinputstream.cpp:260:
// Get the number of bytes currently available on the input stream
// that could be read without blocking.
std::size_t available = inputStream->available();
// Calculate the number of bytes that we can read. Always >= 1 byte!
std::size_t bytesToRead = max( (std::size_t)1, min( available, getUnusedBytes() ) );
// Read the bytes from the input stream.
int bytesRead = inputStream->read( getTail(), 0, bytesToRead );
....as a result of the stream close() call in thread #1. The close() call is in place to break any waiting IO so the executor thread may terminate in an orderly fashion, but the stream and/or iotransport calls don't anticipate a case where in-stream processing is underway at the time its internal state is reset.
This suggests either:
(1) more careful synchronization of the compound stream's state
(2) realigning the compound stream's member data
...I would vote the former, and could suggest some options.
Additionally, when reviewing the code in in thread #1 we found the following, beginning at iotransport.cpp:150:
// We have to close the input stream before
// we stop the thread. this will force us to
// wake up the thread if it's stuck in a read
// (which is likely). Otherwise, the join that
// follows will block forever.
if( inputStream != NULL )
// Wait for the thread to die.
if( thread != NULL )
// Close the output stream.
if( outputStream != NULL )
...our familiarity with decaf is limited, but it appears inputStream and outputStream are conventional pointers, so should therefore be deleted prior to being set to NULL to avoid leaking memory. To minimize the other potential segv edge cases, we would suggest doing the close() calls before completing the thread join(), then deleting/NULL'ing the pointers afterwards.