summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-12 08:39:35 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-12 08:39:35 +0100
commit0fcd1609386fb7b487e00173099d23a118e616b3 (patch)
tree383d9776146f66918333907ef5fab8a9327e24b8 /src
parent6a7530bc79e343a7a06b6d822256e8810e05bf1b (diff)
downloadrabbitmq-server-git-0fcd1609386fb7b487e00173099d23a118e616b3.tar.gz
refactor: extract similarities between ack and requeue
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl124
1 files changed, 48 insertions, 76 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e21b7f6f2d..73bb6e1983 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -546,41 +546,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4,
pending_ack = PA1 }}
end.
-ack([], State) ->
- State;
ack(AckTags, State) ->
- {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
- lists:foldl(
- fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate {
- pending_ack = PA }}) ->
- PA1 = dict:erase(SeqId, PA),
- State3 = State2 #vqstate { pending_ack = PA1 },
- case dict:find(SeqId, PA) of
- {ok, #msg_status { index_on_disk = false, %% ASSERTIONS
- msg_on_disk = false,
- is_persistent = false }} ->
- {SeqIdsAcc, Dict, State3};
- {ok, {IsPersistent, Guid}} ->
- MsgStore = find_msg_store(IsPersistent),
- SeqIdsAcc1 = case IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- {SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict), State3}
- end
- end, {[], dict:new(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
- end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ ack(fun (_AckEntry, State1) -> State1 end, AckTags, State).
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
@@ -631,48 +598,18 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
end}.
requeue(AckTags, State) ->
- {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
- lists:foldl(
- fun (SeqId, {SeqIdsAcc, Dict, State2 = #vqstate {
- msg_store_clients = MSCState,
- pending_ack = PA }}) ->
- PA1 = dict:erase(SeqId, PA),
- State3 = State2 #vqstate { pending_ack = PA1 },
- case dict:find(SeqId, PA) of
- {ok, #msg_status { index_on_disk = false, %% ASSERTIONS
- msg_on_disk = false,
- is_persistent = false,
- msg = Msg }} ->
- {_SeqId, State4} =
- publish(Msg, true, false, State3),
- {SeqIdsAcc, Dict, State4};
- {ok, {IsPersistent, Guid}} ->
- {{ok, Msg = #basic_message{}}, MSCState1} =
- read_from_msg_store(
- MSCState, IsPersistent, Guid),
- State4 = State3 #vqstate {
- msg_store_clients = MSCState1 },
- {_SeqId, State5} = publish(Msg, true, true, State4),
- MsgStore = find_msg_store(IsPersistent),
- SeqIdsAcc1 = case IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- {SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict), State5}
- end
- end, {[], dict:new(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:release(MsgStore, Guids)
- end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
+ ack(fun (#msg_status { msg = Msg }, State1) ->
+ {_SeqId, State2} = publish(Msg, true, false, State1),
+ State2;
+ ({IsPersistent, Guid}, State1 = #vqstate {
+ msg_store_clients = MSCState }) ->
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ read_from_msg_store(MSCState, IsPersistent, Guid),
+ {_SeqId, State2} = publish(Msg, true, true,
+ State1 #vqstate {
+ msg_store_clients = MSCState1 }),
+ State2
+ end, AckTags, State).
len(#vqstate { len = Len }) ->
Len.
@@ -936,6 +873,41 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
+ack(_Fun, [], State) ->
+ State;
+ack(Fun, AckTags, State) ->
+ {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
+ lists:foldl(
+ fun (SeqId, {{SeqIdsAcc, Dict}, State2 = #vqstate {
+ pending_ack = PA }}) ->
+ {ok, AckEntry} = dict:find(SeqId, PA),
+ {case AckEntry of
+ #msg_status { index_on_disk = false, %% ASSERTIONS
+ msg_on_disk = false,
+ is_persistent = false } ->
+ {SeqIdsAcc, Dict};
+ {IsPersistent, Guid} ->
+ {case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
+ rabbit_misc:dict_cons(find_msg_store(IsPersistent),
+ Guid, Dict)}
+ end, Fun(AckEntry, State2 #vqstate {
+ pending_ack = dict:erase(SeqId, PA) })}
+ end, {{[], dict:new()}, State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:release(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
+ error -> 0;
+ {ok, Guids} -> length(Guids)
+ end,
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
+
msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(