Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.17
-
None
Description
I can get the broker to crash with a simple configuration involving multiple 'auto delete' queues.
The following client pseudo code can cause the crash:
loc_sess = [ ];
- create couple of sessions
for i in range(in_loops):
queue_durability = False;
if (i % 2 == 1):
queue_durability = True; - create new local session[s]
lsess = self.connection.session(loc_sess_name % i); - delete the queue (if needed)
self.cleanup(in_queue=loc_q_name % i); - declare auto-delete queue[s]
lsess.queue_declare(queue=loc_q_name % i,
auto_delete=True,
arguments= {"qpid.auto_delete_timeout" : q_timeout},
durable=queue_durability);
- check that queue[s] is still available
result = lsess.queue_query(queue=loc_q_name % i);
self.assertEqual(loc_q_name % i, result.queue);
- bind queue to exchange amf.fanout
lsess.exchange_bind(exchange=e_name,
queue=loc_q_name % i,
binding_key=f_name);
- append the session to list
loc_sess.append(lsess);
- send messages to the queues via amq.fanout
dp = sess.delivery_properties(routing_key=f_name);
msg_cnt = random.randint(*MSG_CNT_RR);
print "setup: in_loops:%d, msg_cnt:%d" % (in_loops, msg_cnt);
for j in range(msg_cnt):
sess.message_transfer(destination=e_name,
message=qpid.datatypes.Message(dp, msg_layout % j));
- check that queues contain correct message count via QMF
self.startQmf();
for i in range(in_loops):
sq = self.qmf_session.getObjects(_class="queue", name=loc_q_name % i)[0];
self.assertEqual (sq.msgDepth, msg_cnt);
- receive one (first) message from the queues
for i in range(in_loops):
loc_sess[i].message_subscribe(destination="dlq", queue=loc_q_name % i);
loc_sess[i].message_flow(destination="dlq", value=0xFFFFFFFFL,
unit=loc_sess[i].credit_unit.message)
loc_sess[i].message_flow(destination="dlq", value=0xFFFFFFFFL,
unit=loc_sess[i].credit_unit.byte)
dlq = loc_sess[i].incoming("dlq");
msg=dlq.get(timeout=1);
self.assertEqual(msg_layout % 0, msg.body);
- check that queues are present at this point (subscription still alive atm)
for i in range(in_loops): - browse sessions
result = loc_sess[i].queue_query(queue=loc_q_name % i);
self.assertEqual(loc_q_name % i, result.queue);
loc_sess[i].close();
- check that queues are still available (after local sessions JUST closed)
for i in range(in_loops): - browse sessions
result = sess.queue_query(queue=loc_q_name % i);
self.assertEqual(loc_q_name % i, result.queue);
print "sleeping - waiting for queue auto timeout"
time.sleep(q_timeout+AD_TIMEOUT_TOL);
- check whether queue has been deleted (expected to be deleted)
for i in range(in_loops):
result = sess.queue_query(queue=loc_q_name % i);
self.assert_(not result.queue);
Analysis:
The ClusterTimer is unable to handle storing two timer tasks that have the same name. The Queue code creates a timer task for each auto delete queue. These tasks all have the same name "DelayedAutoDeletion". This causes ClusterTimer::add() to throw an exception as it thinks there are duplicate timer tasks.