summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-05-21 18:01:41 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-05-21 18:01:41 +0100
commitc6aedf1bb07b0d651074655d9c934aed8b81d1c9 (patch)
tree395ee2c8f77efd982fa7ea2a146f799ca1ed54c3
parent05b140a3c0820ae62a4201bceb60cd364a3a4eb0 (diff)
downloadrabbitmq-server-git-c6aedf1bb07b0d651074655d9c934aed8b81d1c9.tar.gz
Rip out now-unnecessary logic for setting the persistent store to the transient store: that solution allowed the persistent flag to be honoured in non-durable queues by sending the message to the transient store; we've now decided that we don't want to send non-persistent msgs to any store and are handling that by (effectively) unsetting the is_persistent flag for all msgs arriving in a non-durable queue
-rw-r--r--src/rabbit_variable_queue.erl198
1 files changed, 84 insertions, 114 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b10703b4ac..9424fab51f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -189,7 +189,6 @@
len,
on_sync,
msg_store_clients,
- persistent_store,
persistent_count,
transient_threshold,
pending_ack,
@@ -266,7 +265,6 @@
[fun (() -> any())]},
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
- persistent_store :: pid() | atom(),
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
pending_ack :: dict(),
@@ -304,12 +302,8 @@ start(DurableQueues) ->
Refs, StartFunState]).
init(QueueName, IsDurable, _Recover) ->
- PersistentStore = case IsDurable of
- true -> ?PERSISTENT_MSG_STORE;
- false -> ?TRANSIENT_MSG_STORE
- end,
MsgStoreRecovered =
- rabbit_msg_store:successfully_recovered_state(PersistentStore),
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
ContainsCheckFun =
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
@@ -333,7 +327,11 @@ init(QueueName, IsDurable, _Recover) ->
end_seq_id = NextSeqId }
end,
Now = now(),
- PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef),
+ PersistentClient =
+ case IsDurable of
+ true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
+ false -> undefined
+ end,
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
@@ -359,7 +357,6 @@ init(QueueName, IsDurable, _Recover) ->
on_sync = {[], [], []},
msg_store_clients = {{PersistentClient, PRef},
{TransientClient, TRef}},
- persistent_store = PersistentStore,
persistent_count = DeltaCount1,
transient_threshold = NextSeqId,
pending_ack = dict:new(),
@@ -388,7 +385,6 @@ delete_and_terminate(State) ->
State2 = #vqstate {
index_state = IndexState,
msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}},
- persistent_store = PersistentStore,
transient_threshold = TransientThreshold } =
remove_pending_ack(false, State1),
%% flushing here is good because it deletes all full segments,
@@ -399,21 +395,22 @@ delete_and_terminate(State) ->
{N, N, IndexState3} ->
IndexState3;
{DeltaSeqId, NextSeqId, IndexState3} ->
- delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3)
+ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState3)
end,
IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
- rabbit_msg_store:delete_client(PersistentStore, PRef),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:delete_client(?PERSISTENT_MSG_STORE, PRef),
+ rabbit_msg_store:client_terminate(MSCStateP)
+ end,
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
- rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_terminate(MSCStateT),
State2 #vqstate { index_state = IndexState5,
msg_store_clients = undefined }.
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
- persistent_store = PersistentStore }) ->
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} =
- remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3,
- Q4, IndexState),
+ remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState),
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
@@ -434,18 +431,17 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
out_counter = OutCount,
in_counter = InCount,
msg_store_clients = MSCState,
- persistent_store = PersistentStore,
persistent_count = PCount,
pending_ack = PA,
durable = IsDurable }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsDurable andalso IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
- MsgStatus, MSCState),
+ {MsgStatus1, MSCState1} =
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
State1 = State #vqstate { msg_store_clients = MSCState1,
- persistent_count = PCount + case IsPersistent of
+ persistent_count = PCount + case IsPersistent1 of
true -> 1;
false -> 0
end,
@@ -467,7 +463,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
fetch(AckRequired, State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
index_state = IndexState, len = Len, persistent_count = PCount,
- persistent_store = PersistentStore, pending_ack = PA }) ->
+ pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_or_delta(State) of
@@ -493,7 +489,7 @@ fetch(AckRequired, State =
end,
%% 2. If it's on disk and there's no Ack required, remove it
- MsgStore = find_msg_store(IsPersistent, PersistentStore),
+ MsgStore = find_msg_store(IsPersistent),
IndexState2 =
case MsgOnDisk andalso not AckRequired of
%% Remove from disk now
@@ -547,7 +543,6 @@ ack([], State) ->
State;
ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
- persistent_store = PersistentStore,
pending_ack = PA }) ->
{GuidsByStore, SeqIds, PA1} =
lists:foldl(
@@ -558,19 +553,20 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
msg_on_disk = false,
is_persistent = false }} ->
{Dict, SeqIds, PAN1};
- {ok, {false, Guid}} ->
- {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid,
- Dict), SeqIds, PAN1};
- {ok, {true, Guid}} ->
- {rabbit_misc:dict_cons(PersistentStore, Guid, Dict),
- [SeqId | SeqIds], PAN1}
+ {ok, {IsPersistent, Guid}} ->
+ SeqIds1 = case IsPersistent of
+ true -> [SeqId | SeqIds];
+ false -> SeqIds
+ end,
+ {rabbit_misc:dict_cons(find_msg_store(IsPersistent),
+ Guid, Dict), SeqIds1, PAN1}
end
end, {dict:new(), [], PA}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
@@ -580,13 +576,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
State = #vqstate { msg_store_clients = MSCState,
- persistent_store = PersistentStore,
durable = true }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
{#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
publish_in_tx(Txn, Msg),
State #vqstate { msg_store_clients = MSCState1 };
tx_publish(Txn, Msg, State) ->
@@ -597,19 +592,17 @@ tx_ack(Txn, AckTags, State) ->
ack_in_tx(Txn, AckTags),
State.
-tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore,
- durable = IsDurable }) ->
+tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
ok = case IsDurable of
- true -> rabbit_msg_store:remove(PersistentStore,
+ true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE,
persistent_guids(Pubs));
false -> ok
end,
{lists:flatten(AckTags), State}.
-tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore,
- durable = IsDurable }) ->
+tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
@@ -617,10 +610,9 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore,
PubsOrdered = lists:reverse(Pubs),
AckTags1 = lists:flatten(AckTags),
PersistentGuids = persistent_guids(PubsOrdered),
- IsTransientPubs = (not IsDurable) orelse [] == PersistentGuids,
+ IsTransientPubs = [] == PersistentGuids,
{AckTags1,
- case IsTransientPubs orelse
- ?TRANSIENT_MSG_STORE == PersistentStore of
+ case (not IsDurable) orelse IsTransientPubs of
true ->
tx_commit_post_msg_store(
IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
@@ -632,7 +624,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore,
State
end}.
-requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
+requeue(AckTags, State) ->
{SeqIds, GuidsByStore,
State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
@@ -653,27 +645,24 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
{ok, {IsPersistent, Guid}} ->
{{ok, Msg = #basic_message{}}, MSCStateN1} =
read_from_msg_store(
- PersistentStore, MSCStateN, IsPersistent, Guid),
+ MSCStateN, IsPersistent, Guid),
StateN2 = StateN1 #vqstate {
msg_store_clients = MSCStateN1 },
{_SeqId, StateN3} = publish(Msg, true, true, StateN2),
- {SeqIdsAcc1, MsgStore} =
- case IsPersistent of
- true ->
- {[SeqId | SeqIdsAcc], PersistentStore};
- false ->
- {SeqIdsAcc, ?TRANSIENT_MSG_STORE}
- end,
+ MsgStore = find_msg_store(IsPersistent),
+ SeqIdsAcc1 = case IsPersistent of
+ true -> [SeqId | SeqIdsAcc];
+ false -> SeqIdsAcc
+ end,
{SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict),
- StateN3}
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3}
end
end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:release(MsgStore, Guids)
end, ok, GuidsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
+ PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of
error -> 0;
{ok, Guids} -> length(Guids)
end,
@@ -771,7 +760,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
- persistent_store = PersistentStore,
index_state = IndexState }) ->
{SeqIds, GuidsByStore, PA1} =
dict:fold(
@@ -783,7 +771,7 @@ remove_pending_ack(KeepPersistent,
case IsPersistent of
true -> {[SeqId | SeqIdsAcc],
rabbit_misc:dict_cons(
- PersistentStore, Guid, Dict), PAN1};
+ ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1};
false -> {SeqIdsAcc,
rabbit_misc:dict_cons(
?TRANSIENT_MSG_STORE, Guid, Dict), PAN1}
@@ -952,11 +940,10 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
#vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
- persistent_store = PersistentStore,
- pending_ack = PA }) ->
+ pending_ack = PA, durable = IsDurable }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
%% persistent acks) then we can skip the queue_index loop.
- case PersistentStore == ?TRANSIENT_MSG_STORE orelse
+ case (not IsDurable) orelse
(IsTransientPubs andalso
lists:foldl(
fun (AckTag, true ) ->
@@ -997,12 +984,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
[ Fun() || Fun <- lists:reverse(SFuns) ],
State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
-delete1(_PersistentStore, _TransientThreshold, NextSeqId, DeltaSeqId,
- IndexState) when DeltaSeqId =:= undefined
- orelse DeltaSeqId >= NextSeqId ->
+delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState)
+ when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId ->
IndexState;
-delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId,
- IndexState) ->
+delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
{List, Again, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState),
IndexState2 =
@@ -1012,38 +997,33 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId,
List, TransientThreshold, IndexState1),
{_Count, IndexState4} =
remove_queue_entries(
- PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q, IndexState3),
+ fun beta_fold_no_index_on_disk/3, Q, IndexState3),
IndexState4
end,
- delete1(PersistentStore, TransientThreshold, NextSeqId, Again,
- IndexState2).
+ delete1(TransientThreshold, NextSeqId, Again, IndexState2).
-purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
- persistent_store = PersistentStore }) ->
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
case bpqueue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
remove_queue_entries(
- PersistentStore, fun rabbit_misc:queue_fold/3,
- State #vqstate.q1, IndexState),
+ fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
index_state = IndexState1 }};
false ->
{Q3Count, IndexState1} =
remove_queue_entries(
- PersistentStore, fun beta_fold_no_index_on_disk/3,
- Q3, IndexState),
+ fun beta_fold_no_index_on_disk/3, Q3, IndexState),
purge1(Count + Q3Count,
maybe_deltas_to_betas(
State #vqstate { index_state = IndexState1,
q3 = bpqueue:new() }))
end.
-remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
- {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} =
+remove_queue_entries(Fold, Q, IndexState) ->
+ {Count, GuidsByStore, SeqIds, IndexState1} =
Fold(fun remove_queue_entries1/2,
- {PersistentStore, 0, dict:new(), [], IndexState}, Q),
+ {0, dict:new(), [], IndexState}, Q),
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
@@ -1058,11 +1038,11 @@ remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
+ {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
GuidsByStore1 =
case {MsgOnDisk, IsPersistent} of
{true, true} ->
- rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore);
+ rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore);
{true, false} ->
rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore);
{false, _} ->
@@ -1076,15 +1056,14 @@ remove_queue_entries1(
true -> rabbit_queue_index:deliver(SeqId, IndexStateN);
false -> IndexStateN
end,
- {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
+ {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2,
delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
- msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ msg_store_clients = MSCState }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
0 = DeltaCount, %% ASSERTION
@@ -1096,8 +1075,7 @@ fetch_from_q3_or_delta(State = #vqstate {
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid }}, MSCState1} =
- read_from_msg_store(
- PersistentStore, MSCState, IsPersistent, Guid),
+ read_from_msg_store(MSCState, IsPersistent, Guid),
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = case IndexOnDisk of
true -> RamIndexCount;
@@ -1201,10 +1179,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
publish(msg, MsgStatus, #vqstate {
index_state = IndexState, ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState,
- persistent_store = PersistentStore } = State) ->
+ msg_store_clients = MSCState } = State) ->
{MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(false, MsgStatus, MSCState),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
@@ -1213,11 +1190,10 @@ publish(msg, MsgStatus, #vqstate {
store_alpha_entry(MsgStatus2, State1);
publish(index, MsgStatus, #vqstate {
- index_state = IndexState, q1 = Q1,
ram_index_count = RamIndexCount, msg_store_clients = MSCState,
- persistent_store = PersistentStore } = State) ->
+ index_state = IndexState, q1 = Q1 } = State) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
@@ -1233,10 +1209,9 @@ publish(index, MsgStatus, #vqstate {
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
#vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta, msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ delta = Delta, msg_store_clients = MSCState }) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus1, IndexState),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
@@ -1271,42 +1246,39 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
end.
-find_msg_store(true, PersistentStore) -> PersistentStore;
-find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE.
+find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
+find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
-with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true,
- Fun) ->
- {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP),
+with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP),
{Result, {{MSCStateP1, PRef}, MSCStateT}};
-with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false,
- Fun) ->
+with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) ->
{Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
{Result, {MSCStateP, {MSCStateT1, TRef}}}.
-read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) ->
+read_from_msg_store(MSCState, IsPersistent, Guid) ->
with_msg_store_state(
- PersistentStore, MSCState, IsPersistent,
+ MSCState, IsPersistent,
fun (MsgStore, MSCState1) ->
rabbit_msg_store:read(MsgStore, Guid, MSCState1)
end).
-maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
+maybe_write_msg_to_disk(_Force, MsgStatus =
#msg_status { msg_on_disk = true }, MSCState) ->
{MsgStatus, MSCState};
-maybe_write_msg_to_disk(PersistentStore, Force,
- MsgStatus = #msg_status {
- msg = Msg, guid = Guid,
- is_persistent = IsPersistent }, MSCState)
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, guid = Guid,
+ is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
{ok, MSCState1} =
with_msg_store_state(
- PersistentStore, MSCState, IsPersistent,
+ MSCState, IsPersistent,
fun (MsgStore, MSCState2) ->
rabbit_msg_store:write(
MsgStore, Guid, ensure_binary_properties(Msg), MSCState2)
end),
{MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) ->
+maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
{MsgStatus, MSCState}.
maybe_write_index_to_disk(_Force, MsgStatus =
@@ -1465,14 +1437,12 @@ maybe_push_alphas_to_betas(
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- index_state = IndexState, msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ index_state = IndexState, msg_store_clients = MSCState }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
{MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(
- PersistentStore, true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),