diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-15 07:54:25 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-15 07:54:25 +0100 |
| commit | bf34fd55f2159c524a37159a7a3d3cd1540f8f93 (patch) | |
| tree | 30ecc25d2e6e847d5e4988f4e3cdc733d20b3001 /src | |
| parent | a3d42df9d459b1afad4bc03be9ae2d27ef94843f (diff) | |
| download | rabbitmq-server-git-bf34fd55f2159c524a37159a7a3d3cd1540f8f93.tar.gz | |
cosmetic
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 152 |
1 files changed, 78 insertions, 74 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8799fff31a..84db32466e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -735,51 +735,6 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) -> - {case IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}. - -record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). - -remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState }) -> - {{SeqIds, GuidsByStore}, PA1} = - dict:fold( - fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) -> - {accumulate_ack(SeqId, IsPersistent, Guid, Acc), - case KeepPersistent andalso IsPersistent of - true -> PA2; - false -> dict:erase(SeqId, PA2) - end}; - (SeqId, #msg_status {}, {Acc, PA2}) -> - {Acc, dict:erase(SeqId, PA2)} - end, {{[], dict:new()}, PA}, PA), - case KeepPersistent of - true -> State1 = State #vqstate { pending_ack = PA1 }, - case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of - error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), - State1 - end; - false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) - end, ok, GuidsByStore), - State #vqstate { pending_ack = dict:new(), - index_state = IndexState1 } - end. - lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx { pending_messages = [], pending_acks = [] }; @@ -859,35 +814,6 @@ beta_fold(Fun, Init, Q) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -ack(_MsgStoreFun, _Fun, [], State) -> - State; -ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = - lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> - {ok, AckEntry} = dict:find(SeqId, PA), - {case AckEntry of - #msg_status { index_on_disk = false, %% ASSERTIONS - msg_on_disk = false, - is_persistent = false } -> - Acc; - {IsPersistent, Guid} -> - accumulate_ack(SeqId, IsPersistent, Guid, Acc) - end, Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} - end, {{[], dict:new()}, State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) - end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. - msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -1111,6 +1037,84 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, msg_store_clients = MSCState1 }}. %%---------------------------------------------------------------------------- +%% Internal gubbins for acks +%%---------------------------------------------------------------------------- + +record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + AckEntry = case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus + end, + dict:store(SeqId, AckEntry, PA). + +remove_pending_ack(KeepPersistent, + State = #vqstate { pending_ack = PA, + index_state = IndexState }) -> + {{SeqIds, GuidsByStore}, PA1} = + dict:fold( + fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) -> + {accumulate_ack(SeqId, IsPersistent, Guid, Acc), + case KeepPersistent andalso IsPersistent of + true -> PA2; + false -> dict:erase(SeqId, PA2) + end}; + (SeqId, #msg_status {}, {Acc, PA2}) -> + {Acc, dict:erase(SeqId, PA2)} + end, {{[], dict:new()}, PA}, PA), + case KeepPersistent of + true -> State1 = State #vqstate { pending_ack = PA1 }, + case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + error -> State1; + {ok, Guids} -> ok = rabbit_msg_store:remove( + ?TRANSIENT_MSG_STORE, Guids), + State1 + end; + false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + State #vqstate { pending_ack = dict:new(), + index_state = IndexState1 } + end. + +ack(_MsgStoreFun, _Fun, [], State) -> + State; +ack(MsgStoreFun, Fun, AckTags, State) -> + {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = + lists:foldl( + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + {ok, AckEntry} = dict:find(SeqId, PA), + {case AckEntry of + #msg_status { index_on_disk = false, %% ASSERTIONS + msg_on_disk = false, + is_persistent = false } -> + Acc; + {IsPersistent, Guid} -> + accumulate_ack(SeqId, IsPersistent, Guid, Acc) + end, Fun(AckEntry, State2 #vqstate { + pending_ack = dict:erase(SeqId, PA) })} + end, {{[], dict:new()}, State}, AckTags), + IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + MsgStoreFun(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of + error -> 0; + {ok, Guids} -> length(Guids) + end, + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. + +accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) -> + {case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
