summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl223
1 files changed, 103 insertions, 120 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3d485106d3..88c2719a36 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -189,10 +189,10 @@
len,
on_sync,
msg_store_clients,
- persistent_store,
persistent_count,
transient_threshold,
- pending_ack
+ pending_ack,
+ durable
}).
-record(msg_status,
@@ -265,10 +265,10 @@
[fun (() -> any())]},
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
- pending_ack :: dict()
+ pending_ack :: dict(),
+ durable :: boolean()
}).
-include("rabbit_backing_queue_spec.hrl").
@@ -302,12 +302,8 @@ start(DurableQueues) ->
Refs, StartFunState]).
init(QueueName, IsDurable, _Recover) ->
- PersistentStore = case IsDurable of
- true -> ?PERSISTENT_MSG_STORE;
- false -> ?TRANSIENT_MSG_STORE
- end,
MsgStoreRecovered =
- rabbit_msg_store:successfully_recovered_state(PersistentStore),
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
ContainsCheckFun =
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
@@ -331,7 +327,11 @@ init(QueueName, IsDurable, _Recover) ->
end_seq_id = NextSeqId }
end,
Now = now(),
- PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef),
+ PersistentClient =
+ case IsDurable of
+ true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
+ false -> undefined
+ end,
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
@@ -357,10 +357,10 @@ init(QueueName, IsDurable, _Recover) ->
on_sync = {[], [], []},
msg_store_clients = {{PersistentClient, PRef},
{TransientClient, TRef}},
- persistent_store = PersistentStore,
persistent_count = DeltaCount1,
transient_threshold = NextSeqId,
- pending_ack = dict:new()
+ pending_ack = dict:new(),
+ durable = IsDurable
},
maybe_deltas_to_betas(State).
@@ -385,7 +385,6 @@ delete_and_terminate(State) ->
State2 = #vqstate {
index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
- persistent_store = PersistentStore,
transient_threshold = TransientThreshold } =
remove_pending_ack(false, State1),
%% flushing here is good because it deletes all full segments,
@@ -396,21 +395,22 @@ delete_and_terminate(State) ->
{N, N, IndexState3} ->
IndexState3;
{DeltaSeqId, NextSeqId, IndexState3} ->
- delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3)
+ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState3)
end,
IndexState5 = rabbit_queue_index:delete_and_terminate(IndexState2),
- rabbit_msg_store:delete_client(PersistentStore, PRef),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:delete_client(?PERSISTENT_MSG_STORE, PRef),
+ rabbit_msg_store:client_terminate(MSCStateP)
+ end,
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
- rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
State2 #vqstate { index_state = IndexState5,
msg_store_clients = undefined }.
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
- persistent_store = PersistentStore }) ->
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} =
- remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3,
- Q4, IndexState),
+ remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState),
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
@@ -431,16 +431,17 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
out_counter = OutCount,
in_counter = InCount,
msg_store_clients = MSCState,
- persistent_store = PersistentStore,
persistent_count = PCount,
- pending_ack = PA }) ->
+ pending_ack = PA,
+ durable = IsDurable }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
- MsgStatus, MSCState),
+ {MsgStatus1, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State1 = State #vqstate { msg_store_clients = MSCState1,
- persistent_count = PCount + case IsPersistent of
+ persistent_count = PCount + case IsPersistent1 of
true -> 1;
false -> 0
end,
@@ -462,7 +463,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
fetch(AckRequired, State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
index_state = IndexState, len = Len, persistent_count = PCount,
- persistent_store = PersistentStore, pending_ack = PA }) ->
+ pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_or_delta(State) of
@@ -488,7 +489,7 @@ fetch(AckRequired, State =
end,
%% 2. If it's on disk and there's no Ack required, remove it
- MsgStore = find_msg_store(IsPersistent, PersistentStore),
+ MsgStore = find_msg_store(IsPersistent),
IndexState2 =
case MsgOnDisk andalso not AckRequired of
%% Remove from disk now
@@ -542,7 +543,6 @@ ack([], State) ->
State;
ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
- persistent_store = PersistentStore,
pending_ack = PA }) ->
{GuidsByStore, SeqIds, PA1} =
lists:foldl(
@@ -553,19 +553,20 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
msg_on_disk = false,
is_persistent = false }} ->
{Dict, SeqIds, PAN1};
- {ok, {false, Guid}} ->
- {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid,
- Dict), SeqIds, PAN1};
- {ok, {true, Guid}} ->
- {rabbit_misc:dict_cons(PersistentStore, Guid, Dict),
- [SeqId | SeqIds], PAN1}
+ {ok, {IsPersistent, Guid}} ->
+ SeqIds1 = case IsPersistent of
+ true -> [SeqId | SeqIds];
+ false -> SeqIds
+ end,
+ {rabbit_misc:dict_cons(find_msg_store(IsPersistent),
+ Guid, Dict), SeqIds1, PAN1}
end
end, {dict:new(), [], PA}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
@@ -575,12 +576,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
State = #vqstate { msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ durable = true }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
{#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
publish_in_tx(Txn, Msg),
State #vqstate { msg_store_clients = MSCState1 };
tx_publish(Txn, Msg, State) ->
@@ -591,13 +592,17 @@ tx_ack(Txn, AckTags, State) ->
ack_in_tx(Txn, AckTags),
State.
-tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) ->
+tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)),
+ ok = case IsDurable of
+ true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE,
+ persistent_guids(Pubs));
+ false -> ok
+ end,
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
+tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
@@ -607,8 +612,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
PersistentGuids = persistent_guids(PubsOrdered),
IsTransientPubs = [] == PersistentGuids,
{AckTags1,
- case IsTransientPubs orelse
- ?TRANSIENT_MSG_STORE == PersistentStore of
+ case (not IsDurable) orelse IsTransientPubs of
true ->
tx_commit_post_msg_store(
IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
@@ -620,7 +624,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) ->
State
end}.
-requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
+requeue(AckTags, State) ->
{SeqIds, GuidsByStore,
State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -641,27 +645,24 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
{ok, {IsPersistent, Guid}} ->
{{ok, Msg = #basic_message{}}, MSCStateN1} =
read_from_msg_store(
- PersistentStore, MSCStateN, IsPersistent, Guid),
+ MSCStateN, IsPersistent, Guid),
StateN2 = StateN1 #vqstate {
msg_store_clients = MSCStateN1 },
{_SeqId, StateN3} = publish(Msg, true, true, StateN2),
- {SeqIdsAcc1, MsgStore} =
- case IsPersistent of
- true ->
- {[SeqId | SeqIdsAcc], PersistentStore};
- false ->
- {SeqIdsAcc, ?TRANSIENT_MSG_STORE}
- end,
+ MsgStore = find_msg_store(IsPersistent),
+ SeqIdsAcc1 = case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
{SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict),
- StateN3}
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3}
end
end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:release(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
@@ -759,7 +760,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
- persistent_store = PersistentStore,
index_state = IndexState }) ->
{SeqIds, GuidsByStore, PA1} =
dict:fold(
@@ -771,7 +771,7 @@ remove_pending_ack(KeepPersistent,
case IsPersistent of
true -> {[SeqId | SeqIdsAcc],
rabbit_misc:dict_cons(
- PersistentStore, Guid, Dict), PAN1};
+ ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1};
false -> {SeqIdsAcc,
rabbit_misc:dict_cons(
?TRANSIENT_MSG_STORE, Guid, Dict), PAN1}
@@ -940,11 +940,10 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
#vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
- persistent_store = PersistentStore,
- pending_ack = PA }) ->
+ pending_ack = PA, durable = IsDurable }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
%% persistent acks) then we can skip the queue_index loop.
- case PersistentStore == ?TRANSIENT_MSG_STORE orelse
+ case (not IsDurable) orelse
(IsTransientPubs andalso
lists:foldl(
fun (AckTag, true ) ->
@@ -965,18 +964,18 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) ->
State;
tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
- persistent_store = PersistentStore }) ->
+ durable = IsDurable }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
- IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore,
Pubs = lists:flatten(lists:reverse(SPubs)),
{SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun (Msg = #basic_message { is_persistent = IsPersistent },
{SeqIdsAcc, StateN}) ->
{SeqId, StateN1} =
- publish(Msg, false, IsPersistent, StateN),
- {case IsPersistentStore andalso IsPersistent of
+ publish(Msg, false, IsDurable andalso IsPersistent,
+ StateN),
+ {case IsDurable andalso IsPersistent of
true -> [SeqId | SeqIdsAcc];
false -> SeqIdsAcc
end, StateN1}
@@ -985,12 +984,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
[ Fun() || Fun <- lists:reverse(SFuns) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
-delete1(_PersistentStore, _TransientThreshold, NextSeqId, DeltaSeqId,
- IndexState) when DeltaSeqId =:= undefined
- orelse DeltaSeqId >= NextSeqId ->
+delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState)
+ when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId ->
IndexState;
-delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId,
- IndexState) ->
+delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
{List, Again, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState),
IndexState2 =
@@ -1000,38 +997,33 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId,
List, TransientThreshold, IndexState1),
{_Count, IndexState4} =
remove_queue_entries(
- PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q, IndexState3),
+ fun beta_fold_no_index_on_disk/3, Q, IndexState3),
IndexState4
end,
- delete1(PersistentStore, TransientThreshold, NextSeqId, Again,
- IndexState2).
+ delete1(TransientThreshold, NextSeqId, Again, IndexState2).
-purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
- persistent_store = PersistentStore }) ->
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
case bpqueue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
remove_queue_entries(
- PersistentStore, fun rabbit_misc:queue_fold/3,
- State #vqstate.q1, IndexState),
+ fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
index_state = IndexState1 }};
false ->
{Q3Count, IndexState1} =
remove_queue_entries(
- PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q3, IndexState),
+ fun beta_fold_no_index_on_disk/3, Q3, IndexState),
purge1(Count + Q3Count,
maybe_deltas_to_betas(
State #vqstate { index_state = IndexState1,
q3 = bpqueue:new() }))
end.
-remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
- {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} =
+remove_queue_entries(Fold, Q, IndexState) ->
+ {Count, GuidsByStore, SeqIds, IndexState1} =
Fold(fun remove_queue_entries1/2,
- {PersistentStore, 0, dict:new(), [], IndexState}, Q),
+ {0, dict:new(), [], IndexState}, Q),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -1046,11 +1038,11 @@ remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
+ {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
GuidsByStore1 =
case {MsgOnDisk, IsPersistent} of
{true, true} ->
- rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore);
+ rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore);
{true, false} ->
rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore);
{false, _} ->
@@ -1064,15 +1056,14 @@ remove_queue_entries1(
true -> rabbit_queue_index:deliver(SeqId, IndexStateN);
false -> IndexStateN
end,
- {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
+ {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2,
delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
- msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ msg_store_clients = MSCState }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
0 = DeltaCount, %% ASSERTION
@@ -1084,8 +1075,7 @@ fetch_from_q3_or_delta(State = #vqstate {
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid }}, MSCState1} =
- read_from_msg_store(
- PersistentStore, MSCState, IsPersistent, Guid),
+ read_from_msg_store(MSCState, IsPersistent, Guid),
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = case IndexOnDisk of
true -> RamIndexCount;
@@ -1172,9 +1162,10 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
IsDelivered, MsgOnDisk, State =
#vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
- persistent_count = PCount }) ->
+ persistent_count = PCount, durable = IsDurable }) ->
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsDurable andalso IsPersistent,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = false },
PCount1 = PCount + case IsPersistent of
@@ -1188,10 +1179,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
publish(msg, MsgStatus, #vqstate {
index_state = IndexState, ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState,
- persistent_store = PersistentStore } = State) ->
+ msg_store_clients = MSCState } = State) ->
{MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
@@ -1200,11 +1190,10 @@ publish(msg, MsgStatus, #vqstate {
store_alpha_entry(MsgStatus2, State1);
publish(index, MsgStatus, #vqstate {
- index_state = IndexState, q1 = Q1,
ram_index_count = RamIndexCount, msg_store_clients = MSCState,
- persistent_store = PersistentStore } = State) ->
+ index_state = IndexState, q1 = Q1 } = State) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
@@ -1220,10 +1209,9 @@ publish(index, MsgStatus, #vqstate {
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
#vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta, msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ delta = Delta, msg_store_clients = MSCState }) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus1, IndexState),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
@@ -1258,42 +1246,39 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
end.
-find_msg_store(true, PersistentStore) -> PersistentStore;
-find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE.
+find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
+find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
-with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true,
- Fun) ->
- {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP),
+with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
{Result, {{MSCStateP1, PRef}, MSCStateT}};
-with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false,
- Fun) ->
+with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
{Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
{Result, {MSCStateP, {MSCStateT1, TRef}}}.
-read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) ->
+read_from_msg_store(MSCState, IsPersistent, Guid) ->
with_msg_store_state(
- PersistentStore, MSCState, IsPersistent,
+ MSCState, IsPersistent,
fun (MsgStore, MSCState1) ->
rabbit_msg_store:read(MsgStore, Guid, MSCState1)
end).
-maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
+maybe_write_msg_to_disk(_Force, MsgStatus =
#msg_status { msg_on_disk = true }, MSCState) ->
{MsgStatus, MSCState};
-maybe_write_msg_to_disk(PersistentStore, Force,
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid,
- is_persistent = IsPersistent }, MSCState)
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, guid = Guid,
+ is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
{ok, MSCState1} =
with_msg_store_state(
- PersistentStore, MSCState, IsPersistent,
+ MSCState, IsPersistent,
fun (MsgStore, MSCState2) ->
rabbit_msg_store:write(
MsgStore, Guid, ensure_binary_properties(Msg), MSCState2)
end),
{MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) ->
+maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
{MsgStatus, MSCState}.
maybe_write_index_to_disk(_Force, MsgStatus =
@@ -1452,14 +1437,12 @@ maybe_push_alphas_to_betas(
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- index_state = IndexState, msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ index_state = IndexState, msg_store_clients = MSCState }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
{MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(
- PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),