summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-16 19:05:02 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-16 19:05:02 +0000
commit36498d8ea29cf6dd6fd9b295b7a0ebc62349fdd7 (patch)
tree697c047f70fd1f17676c6c6b6e186d4c84a60fde /src
parent03b335f12f44427637873e557f6399c30ef28151 (diff)
downloadrabbitmq-server-git-36498d8ea29cf6dd6fd9b295b7a0ebc62349fdd7.tar.gz
BQ:ack batches of messages
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a4155420e0..4eecb3bbc1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -804,19 +804,19 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
unconfirmed_qm = UQM,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- {BQS3, UMQ3} =
+ {AckTags1, UMQ3} =
lists:foldl(
- fun (MsgSeqNo, {BQS1, UMQ1}) ->
+ fun (MsgSeqNo, {AckTags, UMQ1}) ->
{QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1),
QPids1 = gb_sets:delete(QPid, QPids),
case gb_sets:is_empty(QPids1) of
- true -> {_Guids, BQS2} =
- BQ:ack([AckTag], undefined, BQS1),
- {BQS2, gb_trees:delete(MsgSeqNo, UMQ1)};
- false -> {BQS1, gb_trees:update(MsgSeqNo,
- {QPids1, AckTag}, UMQ1)}
+ true -> {[AckTag | AckTags],
+ gb_trees:delete(MsgSeqNo, UMQ1)};
+ false -> {AckTags, gb_trees:update(
+ MsgSeqNo, {QPids1, AckTag}, UMQ1)}
end
- end, {BQS, UMQ}, MsgSeqNos),
+ end, {[], UMQ}, MsgSeqNos),
+ {_Guids, BQS1} = BQ:ack(AckTags1, undefined, BQS),
MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
gb_sets:from_list(MsgSeqNos)),
State1 = case gb_sets:is_empty(MsgSeqNos1) of
@@ -829,7 +829,7 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
gb_trees:delete(QPid, UQM)})
end,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
- backing_queue_state = BQS3}).
+ backing_queue_state = BQS1}).
cleanup_after_confirm(State = #q{blocked_ops = Ops,
unconfirmed_mq = UMQ}) ->