diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-16 11:51:30 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-16 11:51:30 +0100 |
| commit | 805372c092366e678005c258851310e7d90c97ab (patch) | |
| tree | 9ac4742eede9907f58c69636c29e943f6c48d514 | |
| parent | 4fb7de5a232e4fccef5fdc40d635bb22c2a560db (diff) | |
| download | rabbitmq-server-git-805372c092366e678005c258851310e7d90c97ab.tar.gz | |
don't forget to process transactional acks
...when the tx doesn't involve any persistent messages, thus plugging
a leak in pending_ack and qi.
To reproduce:
rr(amqp_connection).
Conn = amqp_connection:start_network(),
Ch = amqp_connection:open_channel(Conn),
Q = <<"Q">>,
amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true, exclusive = true}),
amqp_channel:call(Ch, #'basic.publish'{routing_key = Q}, #amqp_msg{}),
amqp_channel:call(Ch, #'basic.get'{queue = Q}),
amqp_channel:call(Ch, #'tx.select'{}),
amqp_channel:call(Ch, #'basic.ack'{delivery_tag = 1}),
amqp_channel:call(Ch, #'tx.commit'{}),
ok.
and then
$ ./scripts/rabbitmqctl list_queues name messages_unacknowledged backing_queue_status
Listing queues ...
Q 0 [{q1,0},{q2,0},{delta,{delta,undefined,0,undefined}},{q3,0},{q4,0},{len,0},{pending_acks,1},{outstanding_txns,0},{target_ram_msg_count,infinity},{ram_msg_count,0},{ram_index_count,0},{avg_egress_rate,0.0},{avg_ingress_rate,0.0},{next_seq_id,1},{persistent_count,0}]
...done.
Note the pending_acks of 1.
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 15509b3c35..3378c435e6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -854,8 +854,10 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, true -> State #vqstate { on_sync = { [AckTags | SAcks], [Pubs | SPubs], [Fun | SFuns] }}; - false -> State1 = tx_commit_index(State #vqstate { - on_sync = {[], [Pubs], [Fun]} }), + false -> State1 = tx_commit_index( + State #vqstate { on_sync = { [AckTags], + [Pubs], + [Fun]} }), State1 #vqstate { on_sync = OnSync } end. |
