diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 13:12:44 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-17 13:12:44 +0100 |
| commit | c651b112e4813147bbbf17297057b9f8e6309749 (patch) | |
| tree | d216409f37e39bbdaee2f8f349f7ecc77f526ba9 | |
| parent | db18287dd0f59c553314a189cdc6d87d14320d1d (diff) | |
| parent | eabe30f8d88af2c2cbcbe3202b0ae2bee1c1da0c (diff) | |
| download | rabbitmq-server-git-c651b112e4813147bbbf17297057b9f8e6309749.tar.gz | |
Mergin' heads
| -rw-r--r-- | src/rabbit_variable_queue.erl | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 72add2af7c..4a64d14d9b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -783,17 +783,18 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> persistent_guids(Pubs) -> [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. -betas_from_segment_entries(List, TransientThreshold, IndexState) -> - {Filtered, IndexState1} = +betas_from_index_entries(List, TransientThreshold, IndexState) -> + {Filtered, Delivers, Acks} = lists:foldr( fun ({Guid, SeqId, IsPersistent, IsDelivered}, - {FilteredAcc, IndexStateAcc}) -> + {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of - true -> {FilteredAcc, - rabbit_queue_index:ack( - [SeqId], maybe_write_delivered( - not IsDelivered, - SeqId, IndexStateAcc))}; + true -> {Filtered1, + case IsDelivered of + true -> Delivers1; + false -> [SeqId | Delivers1] + end, + [SeqId | Acks1]}; false -> {[#msg_status { msg = undefined, guid = Guid, seq_id = SeqId, @@ -801,11 +802,14 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) -> is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = true - } | FilteredAcc], - IndexStateAcc} + } | Filtered1], + Delivers1, + Acks1} end - end, {[], IndexState}, List), - {bpqueue:from_list([{true, Filtered}]), IndexState1}. + end, {[], [], []}, List), + {bpqueue:from_list([{true, Filtered}]), + rabbit_queue_index:ack(Acks, + rabbit_queue_index:deliver(Delivers, IndexState))}. ensure_binary_properties(Msg = #basic_message { content = Content }) -> Msg #basic_message { @@ -968,7 +972,7 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> IndexState2 = case List of [] -> IndexState1; - _ -> {Q, IndexState3} = betas_from_segment_entries( + _ -> {Q, IndexState3} = betas_from_index_entries( List, TransientThreshold, IndexState1), remove_queue_entries(fun beta_fold/3, Q, IndexState3) end, @@ -1296,7 +1300,7 @@ maybe_deltas_to_betas(State = #vqstate { end_seq_id = DeltaSeqIdEnd } = Delta, {List, DeltaSeqId1, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd, IndexState), - {Q3a, IndexState2} = betas_from_segment_entries( + {Q3a, IndexState2} = betas_from_index_entries( List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case bpqueue:len(Q3a) of |
