summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl59
1 files changed, 25 insertions, 34 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f8ced680f5..1319979cf9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -677,6 +677,12 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver(SeqId, IndexState).
+accumulate_ack(SeqId, IsPersistent, Guid, {SeqIdsAcc, Dict}) ->
+ {case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end, rabbit_misc:dict_cons(find_msg_store(IsPersistent), Guid, Dict)}.
+
record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
msg_on_disk = MsgOnDisk } = MsgStatus, PA) ->
@@ -689,24 +695,17 @@ record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId,
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState }) ->
- {SeqIds, GuidsByStore, PA1} =
+ {{SeqIds, GuidsByStore}, PA1} =
dict:fold(
- fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict, PAN}) ->
- PAN1 = case KeepPersistent andalso IsPersistent of
- true -> PAN;
- false -> dict:erase(SeqId, PAN)
- end,
- case IsPersistent of
- true -> {[SeqId | SeqIdsAcc],
- rabbit_misc:dict_cons(
- ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1};
- false -> {SeqIdsAcc,
- rabbit_misc:dict_cons(
- ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1}
- end;
- (SeqId, #msg_status {}, {SeqIdsAcc, Dict, PAN}) ->
- {SeqIdsAcc, Dict, dict:erase(SeqId, PAN)}
- end, {[], dict:new(), PA}, PA),
+ fun (SeqId, {IsPersistent, Guid}, {Acc, PA2}) ->
+ {accumulate_ack(SeqId, IsPersistent, Guid, Acc),
+ case KeepPersistent andalso IsPersistent of
+ true -> PA2;
+ false -> dict:erase(SeqId, PA2)
+ end};
+ (SeqId, #msg_status {}, {Acc, PA2}) ->
+ {Acc, dict:erase(SeqId, PA2)}
+ end, {{[], dict:new()}, PA}, PA),
case KeepPersistent of
true -> State1 = State #vqstate { pending_ack = PA1 },
case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of
@@ -854,21 +853,15 @@ ack(Fun, AckTags, State) ->
{{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {{SeqIdsAcc, Dict}, State2 = #vqstate {
- pending_ack = PA }}) ->
+ fun (SeqId, {Acc, State2 = #vqstate {pending_ack = PA }}) ->
{ok, AckEntry} = dict:find(SeqId, PA),
{case AckEntry of
#msg_status { index_on_disk = false, %% ASSERTIONS
msg_on_disk = false,
is_persistent = false } ->
- {SeqIdsAcc, Dict};
+ Acc;
{IsPersistent, Guid} ->
- {case IsPersistent of
- true -> [SeqId | SeqIdsAcc];
- false -> SeqIdsAcc
- end,
- rabbit_misc:dict_cons(find_msg_store(IsPersistent),
- Guid, Dict)}
+ accumulate_ack(SeqId, IsPersistent, Guid, Acc)
end, Fun(AckEntry, State2 #vqstate {
pending_ack = dict:erase(SeqId, PA) })}
end, {{[], dict:new()}, State}, AckTags),
@@ -998,14 +991,12 @@ remove_queue_entries1(
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
{Count, GuidsByStore, SeqIdsAcc, IndexState}) ->
- GuidsByStore1 =
- case {MsgOnDisk, IsPersistent} of
- {true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE,
- Guid, GuidsByStore);
- {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE,
- Guid, GuidsByStore);
- {false, _} -> GuidsByStore
- end,
+ GuidsByStore1 = case MsgOnDisk of
+ true -> rabbit_misc:dict_cons(
+ find_msg_store(IsPersistent),
+ Guid, GuidsByStore);
+ false -> GuidsByStore
+ end,
SeqIdsAcc1 = case IndexOnDisk of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc