diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-16 19:05:02 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-16 19:05:02 +0000 |
| commit | 36498d8ea29cf6dd6fd9b295b7a0ebc62349fdd7 (patch) | |
| tree | 697c047f70fd1f17676c6c6b6e186d4c84a60fde /src | |
| parent | 03b335f12f44427637873e557f6399c30ef28151 (diff) | |
| download | rabbitmq-server-git-36498d8ea29cf6dd6fd9b295b7a0ebc62349fdd7.tar.gz | |
BQ:ack batches of messages
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a4155420e0..4eecb3bbc1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -804,19 +804,19 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, unconfirmed_qm = UQM, backing_queue = BQ, backing_queue_state = BQS}) -> - {BQS3, UMQ3} = + {AckTags1, UMQ3} = lists:foldl( - fun (MsgSeqNo, {BQS1, UMQ1}) -> + fun (MsgSeqNo, {AckTags, UMQ1}) -> {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), QPids1 = gb_sets:delete(QPid, QPids), case gb_sets:is_empty(QPids1) of - true -> {_Guids, BQS2} = - BQ:ack([AckTag], undefined, BQS1), - {BQS2, gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {BQS1, gb_trees:update(MsgSeqNo, - {QPids1, AckTag}, UMQ1)} + true -> {[AckTag | AckTags], + gb_trees:delete(MsgSeqNo, UMQ1)}; + false -> {AckTags, gb_trees:update( + MsgSeqNo, {QPids1, AckTag}, UMQ1)} end - end, {BQS, UMQ}, MsgSeqNos), + end, {[], UMQ}, MsgSeqNos), + {_Guids, BQS1} = BQ:ack(AckTags1, undefined, BQS), MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), gb_sets:from_list(MsgSeqNos)), State1 = case gb_sets:is_empty(MsgSeqNos1) of @@ -829,7 +829,7 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, gb_trees:delete(QPid, UQM)}) end, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS3}). + backing_queue_state = BQS1}). cleanup_after_confirm(State = #q{blocked_ops = Ops, unconfirmed_mq = UMQ}) -> |
