summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl152
1 files changed, 78 insertions, 74 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8799fff31a..84db32466e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -735,51 +735,6 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) ->
- {case IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
-
-record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
- AckEntry = case MsgOnDisk of
- true -> {IsPersistent, Guid};
- false -> MsgStatus
- end,
- dict:store(SeqId, AckEntry, PA).
-
-remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState }) ->
- {{SeqIds, GuidsByStore}, PA1} =
- dict:fold(
- fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) ->
- {accumulate_ack(SeqId, IsPersistent, Guid, Acc),
- case KeepPersistent andalso IsPersistent of
- true -> PA2;
- false -> dict:erase(SeqId, PA2)
- end};
- (SeqId, #msg_status {}, {Acc, PA2}) ->
- {Acc, dict:erase(SeqId, PA2)}
- end, {{[], dict:new()}, PA}, PA),
- case KeepPersistent of
- true -> State1 = State #vqstate { pending_ack = PA1 },
- case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
- error -> State1;
- {ok, Guids} -> ok = rabbit_msg_store:remove(
- ?TRANSIENT_MSG_STORE, Guids),
- State1
- end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- rabbit_msg_store:remove(MsgStore, Guids)
- end, ok, GuidsByStore),
- State #vqstate { pending_ack = dict:new(),
- index_state = IndexState1 }
- end.
-
lookup_tx(Txn) -> case get({txn, Txn}) of
undefined -> #tx { pending_messages = [],
pending_acks = [] };
@@ -859,35 +814,6 @@ beta_fold(Fun, Init, Q) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-ack(_MsgStoreFun, _Fun, [], State) ->
- State;
-ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
- persistent_count = PCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
- {ok, AckEntry} = dict:find(SeqId, PA),
- {case AckEntry of
- #msg_status { index_on_disk = false, %% ASSERTIONS
- msg_on_disk = false,
- is_persistent = false } ->
- Acc;
- {IsPersistent, Guid} ->
- accumulate_ack(SeqId, IsPersistent, Guid, Acc)
- end, Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA) })}
- end, {{[], dict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = dict:fold(fun (MsgStore, Guids, ok) ->
- MsgStoreFun(MsgStore, Guids)
- end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
- error -> 0;
- {ok, Guids} -> length(Guids)
- end,
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1 }.
-
msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
@@ -1111,6 +1037,84 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
msg_store_clients = MSCState1 }}.
%%----------------------------------------------------------------------------
+%% Internal gubbins for acks
+%%----------------------------------------------------------------------------
+
+record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
+ AckEntry = case MsgOnDisk of
+ true -> {IsPersistent, Guid};
+ false -> MsgStatus
+ end,
+ dict:store(SeqId, AckEntry, PA).
+
+remove_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState }) ->
+ {{SeqIds, GuidsByStore}, PA1} =
+ dict:fold(
+ fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) ->
+ {accumulate_ack(SeqId, IsPersistent, Guid, Acc),
+ case KeepPersistent andalso IsPersistent of
+ true -> PA2;
+ false -> dict:erase(SeqId, PA2)
+ end};
+ (SeqId, #msg_status {}, {Acc, PA2}) ->
+ {Acc, dict:erase(SeqId, PA2)}
+ end, {{[], dict:new()}, PA}, PA),
+ case KeepPersistent of
+ true -> State1 = State #vqstate { pending_ack = PA1 },
+ case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
+ error -> State1;
+ {ok, Guids} -> ok = rabbit_msg_store:remove(
+ ?TRANSIENT_MSG_STORE, Guids),
+ State1
+ end;
+ false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ State #vqstate { pending_ack = dict:new(),
+ index_state = IndexState1 }
+ end.
+
+ack(_MsgStoreFun, _Fun, [], State) ->
+ State;
+ack(MsgStoreFun, Fun, AckTags, State) ->
+ {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
+ persistent_count = PCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) ->
+ {ok, AckEntry} = dict:find(SeqId, PA),
+ {case AckEntry of
+ #msg_status { index_on_disk = false, %% ASSERTIONS
+ msg_on_disk = false,
+ is_persistent = false } ->
+ Acc;
+ {IsPersistent, Guid} ->
+ accumulate_ack(SeqId, IsPersistent, Guid, Acc)
+ end, Fun(AckEntry, State2 #vqstate {
+ pending_ack = dict:erase(SeqId, PA) })}
+ end, {{[], dict:new()}, State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ MsgStoreFun(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
+ error -> 0;
+ {ok, Guids} -> length(Guids)
+ end,
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1 }.
+
+accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) ->
+ {case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------