summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-17 13:12:44 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-17 13:12:44 +0100
commitc651b112e4813147bbbf17297057b9f8e6309749 (patch)
treed216409f37e39bbdaee2f8f349f7ecc77f526ba9
parentdb18287dd0f59c553314a189cdc6d87d14320d1d (diff)
parenteabe30f8d88af2c2cbcbe3202b0ae2bee1c1da0c (diff)
downloadrabbitmq-server-git-c651b112e4813147bbbf17297057b9f8e6309749.tar.gz
Mergin' heads
-rw-r--r--src/rabbit_variable_queue.erl32
1 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 72add2af7c..4a64d14d9b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -783,17 +783,18 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
persistent_guids(Pubs) ->
[Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
-betas_from_segment_entries(List, TransientThreshold, IndexState) ->
- {Filtered, IndexState1} =
+betas_from_index_entries(List, TransientThreshold, IndexState) ->
+ {Filtered, Delivers, Acks} =
lists:foldr(
fun ({Guid, SeqId, IsPersistent, IsDelivered},
- {FilteredAcc, IndexStateAcc}) ->
+ {Filtered1, Delivers1, Acks1}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
- true -> {FilteredAcc,
- rabbit_queue_index:ack(
- [SeqId], maybe_write_delivered(
- not IsDelivered,
- SeqId, IndexStateAcc))};
+ true -> {Filtered1,
+ case IsDelivered of
+ true -> Delivers1;
+ false -> [SeqId | Delivers1]
+ end,
+ [SeqId | Acks1]};
false -> {[#msg_status { msg = undefined,
guid = Guid,
seq_id = SeqId,
@@ -801,11 +802,14 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) ->
is_delivered = IsDelivered,
msg_on_disk = true,
index_on_disk = true
- } | FilteredAcc],
- IndexStateAcc}
+ } | Filtered1],
+ Delivers1,
+ Acks1}
end
- end, {[], IndexState}, List),
- {bpqueue:from_list([{true, Filtered}]), IndexState1}.
+ end, {[], [], []}, List),
+ {bpqueue:from_list([{true, Filtered}]),
+ rabbit_queue_index:ack(Acks,
+ rabbit_queue_index:deliver(Delivers, IndexState))}.
ensure_binary_properties(Msg = #basic_message { content = Content }) ->
Msg #basic_message {
@@ -968,7 +972,7 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
IndexState2 =
case List of
[] -> IndexState1;
- _ -> {Q, IndexState3} = betas_from_segment_entries(
+ _ -> {Q, IndexState3} = betas_from_index_entries(
List, TransientThreshold, IndexState1),
remove_queue_entries(fun beta_fold/3, Q, IndexState3)
end,
@@ -1296,7 +1300,7 @@ maybe_deltas_to_betas(State = #vqstate {
end_seq_id = DeltaSeqIdEnd } = Delta,
{List, DeltaSeqId1, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd, IndexState),
- {Q3a, IndexState2} = betas_from_segment_entries(
+ {Q3a, IndexState2} = betas_from_index_entries(
List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case bpqueue:len(Q3a) of