summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-16 11:51:30 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-16 11:51:30 +0100
commit805372c092366e678005c258851310e7d90c97ab (patch)
tree9ac4742eede9907f58c69636c29e943f6c48d514
parent4fb7de5a232e4fccef5fdc40d635bb22c2a560db (diff)
downloadrabbitmq-server-git-805372c092366e678005c258851310e7d90c97ab.tar.gz
don't forget to process transactional acks
...when the tx doesn't involve any persistent messages, thus plugging a leak in pending_ack and qi. To reproduce: rr(amqp_connection). Conn = amqp_connection:start_network(), Ch = amqp_connection:open_channel(Conn), Q = <<"Q">>, amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true, exclusive = true}), amqp_channel:call(Ch, #'basic.publish'{routing_key = Q}, #amqp_msg{}), amqp_channel:call(Ch, #'basic.get'{queue = Q}), amqp_channel:call(Ch, #'tx.select'{}), amqp_channel:call(Ch, #'basic.ack'{delivery_tag = 1}), amqp_channel:call(Ch, #'tx.commit'{}), ok. and then $ ./scripts/rabbitmqctl list_queues name messages_unacknowledged backing_queue_status Listing queues ... Q 0 [{q1,0},{q2,0},{delta,{delta,undefined,0,undefined}},{q3,0},{q4,0},{len,0},{pending_acks,1},{outstanding_txns,0},{target_ram_msg_count,infinity},{ram_msg_count,0},{ram_index_count,0},{avg_egress_rate,0.0},{avg_ingress_rate,0.0},{next_seq_id,1},{persistent_count,0}] ...done. Note the pending_acks of 1.
-rw-r--r--src/rabbit_variable_queue.erl6
1 files changed, 4 insertions, 2 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 15509b3c35..3378c435e6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -854,8 +854,10 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
true -> State #vqstate { on_sync = { [AckTags | SAcks],
[Pubs | SPubs],
[Fun | SFuns] }};
- false -> State1 = tx_commit_index(State #vqstate {
- on_sync = {[], [Pubs], [Fun]} }),
+ false -> State1 = tx_commit_index(
+ State #vqstate { on_sync = { [AckTags],
+ [Pubs],
+ [Fun]} }),
State1 #vqstate { on_sync = OnSync }
end.