diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 08:39:35 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-12 08:39:35 +0100 |
| commit | 0fcd1609386fb7b487e00173099d23a118e616b3 (patch) | |
| tree | 383d9776146f66918333907ef5fab8a9327e24b8 /src | |
| parent | 6a7530bc79e343a7a06b6d822256e8810e05bf1b (diff) | |
| download | rabbitmq-server-git-0fcd1609386fb7b487e00173099d23a118e616b3.tar.gz | |
refactor: extract similarities between ack and requeue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 124 |
1 files changed, 48 insertions, 76 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e21b7f6f2d..73bb6e1983 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -546,41 +546,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, pending_ack = PA1 }} end. -ack([], State) -> - State; ack(AckTags, State) -> - {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = - lists:foldl( - fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate { - pending_ack = PA }}) -> - PA1 = dict:erase(SeqId, PA), - State3 = State2 #vqstate { pending_ack = PA1 }, - case dict:find(SeqId, PA) of - {ok, #msg_status { index_on_disk = false, %% ASSERTIONS - msg_on_disk = false, - is_persistent = false }} -> - {SeqIdsAcc, Dict, State3}; - {ok, {IsPersistent, Guid}} -> - MsgStore = find_msg_store(IsPersistent), - SeqIdsAcc1 = case IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), State3} - end - end, {[], dict:new(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) - end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + ack(fun (_AckEntry, State1) -> State1 end, AckTags, State). tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, @@ -631,48 +598,18 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> end}. requeue(AckTags, State) -> - {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = - lists:foldl( - fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate { - msg_store_clients = MSCState, - pending_ack = PA }}) -> - PA1 = dict:erase(SeqId, PA), - State3 = State2 #vqstate { pending_ack = PA1 }, - case dict:find(SeqId, PA) of - {ok, #msg_status { index_on_disk = false, %% ASSERTIONS - msg_on_disk = false, - is_persistent = false, - msg = Msg }} -> - {_SeqId, State4} = - publish(Msg, true, false, State3), - {SeqIdsAcc, Dict, State4}; - {ok, {IsPersistent, Guid}} -> - {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store( - MSCState, IsPersistent, Guid), - State4 = State3 #vqstate { - msg_store_clients = MSCState1 }, - {_SeqId, State5} = publish(Msg, true, true, State4), - MsgStore = find_msg_store(IsPersistent), - SeqIdsAcc1 = case IsPersistent of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), State5} - end - end, {[], dict:new(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = dict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:release(MsgStore, Guids) - end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of - error -> 0; - {ok, Guids} -> length(Guids) - end, - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + ack(fun (#msg_status { msg = Msg }, State1) -> + {_SeqId, State2} = publish(Msg, true, false, State1), + State2; + ({IsPersistent, Guid}, State1 = #vqstate { + msg_store_clients = MSCState }) -> + {{ok, Msg = #basic_message{}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + {_SeqId, State2} = publish(Msg, true, true, + State1 #vqstate { + msg_store_clients = MSCState1 }), + State2 + end, AckTags, State). len(#vqstate { len = Len }) -> Len. @@ -936,6 +873,41 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- +ack(_Fun, [], State) -> + State; +ack(Fun, AckTags, State) -> + {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, + persistent_count = PCount }} = + lists:foldl( + fun (SeqId, {{SeqIdsAcc, Dict}, State2 = #vqstate { + pending_ack = PA }}) -> + {ok, AckEntry} = dict:find(SeqId, PA), + {case AckEntry of + #msg_status { index_on_disk = false, %% ASSERTIONS + msg_on_disk = false, + is_persistent = false } -> + {SeqIdsAcc, Dict}; + {IsPersistent, Guid} -> + {case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, + rabbit_misc:dict_cons(find_msg_store(IsPersistent), + Guid, Dict)} + end, Fun(AckEntry, State2 #vqstate { + pending_ack = dict:erase(SeqId, PA) })} + end, {{[], dict:new()}, State}, AckTags), + IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:release(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of + error -> 0; + {ok, Guids} -> length(Guids) + end, + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1 }. + msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( |
