diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 18:01:41 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-21 18:01:41 +0100 |
| commit | c6aedf1bb07b0d651074655d9c934aed8b81d1c9 (patch) | |
| tree | 395ee2c8f77efd982fa7ea2a146f799ca1ed54c3 /src | |
| parent | 05b140a3c0820ae62a4201bceb60cd364a3a4eb0 (diff) | |
| download | rabbitmq-server-git-c6aedf1bb07b0d651074655d9c934aed8b81d1c9.tar.gz | |
Rip out now-unnecessary logic for setting the persistent store to the transient store: that solution allowed the persistent flag to be honoured in non-durable queues by sending the message to the transient store; we've now decided that we don't want to send non-persistent msgs to any store and are handling that by (effectively) unsetting the is_persistent flag for all msgs arriving in a non-durable queue
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 198 |
1 files changed, 84 insertions, 114 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b10703b4ac..9424fab51f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -189,7 +189,6 @@ len, on_sync, msg_store_clients, - persistent_store, persistent_count, transient_threshold, pending_ack, @@ -266,7 +265,6 @@ [fun (() -> any())]}, msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), pending_ack :: dict(), @@ -304,12 +302,8 @@ start(DurableQueues) -> Refs, StartFunState]). init(QueueName, IsDurable, _Recover) -> - PersistentStore = case IsDurable of - true -> ?PERSISTENT_MSG_STORE; - false -> ?TRANSIENT_MSG_STORE - end, MsgStoreRecovered = - rabbit_msg_store:successfully_recovered_state(PersistentStore), + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), ContainsCheckFun = fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) @@ -333,7 +327,11 @@ init(QueueName, IsDurable, _Recover) -> end_seq_id = NextSeqId } end, Now = now(), - PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef), + PersistentClient = + case IsDurable of + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + false -> undefined + end, TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -359,7 +357,6 @@ init(QueueName, IsDurable, _Recover) -> on_sync = {[], [], []}, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, - persistent_store = PersistentStore, persistent_count = DeltaCount1, transient_threshold = NextSeqId, pending_ack = dict:new(), @@ -388,7 +385,6 @@ delete_and_terminate(State) -> State2 = #vqstate { index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, - persistent_store = PersistentStore, transient_threshold = TransientThreshold } = remove_pending_ack(false, State1), %% flushing here is good because it deletes all full segments, @@ -399,21 +395,22 @@ delete_and_terminate(State) -> {N, N, IndexState3} -> IndexState3; {DeltaSeqId, NextSeqId, IndexState3} -> - delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3) + delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState3) end, IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), - rabbit_msg_store:delete_client(PersistentStore, PRef), + case MSCStateP of + undefined -> ok; + _ -> rabbit_msg_store:delete_client(?PERSISTENT_MSG_STORE, PRef), + rabbit_msg_store:client_terminate(MSCStateP) + end, rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), - rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), State2 #vqstate { index_state = IndexState5, msg_store_clients = undefined }. -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, - persistent_store = PersistentStore }) -> +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = - remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3, - Q4, IndexState), + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), @@ -434,18 +431,17 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, out_counter = OutCount, in_counter = InCount, msg_store_clients = MSCState, - persistent_store = PersistentStore, persistent_count = PCount, pending_ack = PA, durable = IsDurable }) -> + IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsDurable andalso IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, - MsgStatus, MSCState), + {MsgStatus1, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), State1 = State #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount + case IsPersistent of + persistent_count = PCount + case IsPersistent1 of true -> 1; false -> 0 end, @@ -467,7 +463,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len, persistent_count = PCount, - persistent_store = PersistentStore, pending_ack = PA }) -> + pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_or_delta(State) of @@ -493,7 +489,7 @@ fetch(AckRequired, State = end, %% 2. If it's on disk and there's no Ack required, remove it - MsgStore = find_msg_store(IsPersistent, PersistentStore), + MsgStore = find_msg_store(IsPersistent), IndexState2 = case MsgOnDisk andalso not AckRequired of %% Remove from disk now @@ -547,7 +543,6 @@ ack([], State) -> State; ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, - persistent_store = PersistentStore, pending_ack = PA }) -> {GuidsByStore, SeqIds, PA1} = lists:foldl( @@ -558,19 +553,20 @@ ack(AckTags, State = #vqstate { index_state = IndexState, msg_on_disk = false, is_persistent = false }} -> {Dict, SeqIds, PAN1}; - {ok, {false, Guid}} -> - {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, - Dict), SeqIds, PAN1}; - {ok, {true, Guid}} -> - {rabbit_misc:dict_cons(PersistentStore, Guid, Dict), - [SeqId | SeqIds], PAN1} + {ok, {IsPersistent, Guid}} -> + SeqIds1 = case IsPersistent of + true -> [SeqId | SeqIds]; + false -> SeqIds + end, + {rabbit_misc:dict_cons(find_msg_store(IsPersistent), + Guid, Dict), SeqIds1, PAN1} end end, {dict:new(), [], PA}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, @@ -580,13 +576,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState, tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, State = #vqstate { msg_store_clients = MSCState, - persistent_store = PersistentStore, durable = true }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), + maybe_write_msg_to_disk(false, MsgStatus, MSCState), publish_in_tx(Txn, Msg), State #vqstate { msg_store_clients = MSCState1 }; tx_publish(Txn, Msg, State) -> @@ -597,19 +592,17 @@ tx_ack(Txn, AckTags, State) -> ack_in_tx(Txn, AckTags), State. -tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore, - durable = IsDurable }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> rabbit_msg_store:remove(PersistentStore, + true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, persistent_guids(Pubs)); false -> ok end, {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore, - durable = IsDurable }) -> +tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), @@ -617,10 +610,9 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore, PubsOrdered = lists:reverse(Pubs), AckTags1 = lists:flatten(AckTags), PersistentGuids = persistent_guids(PubsOrdered), - IsTransientPubs = (not IsDurable) orelse [] == PersistentGuids, + IsTransientPubs = [] == PersistentGuids, {AckTags1, - case IsTransientPubs orelse - ?TRANSIENT_MSG_STORE == PersistentStore of + case (not IsDurable) orelse IsTransientPubs of true -> tx_commit_post_msg_store( IsTransientPubs, PubsOrdered, AckTags1, Fun, State); @@ -632,7 +624,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore, State end}. -requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> +requeue(AckTags, State) -> {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -653,27 +645,24 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> {ok, {IsPersistent, Guid}} -> {{ok, Msg = #basic_message{}}, MSCStateN1} = read_from_msg_store( - PersistentStore, MSCStateN, IsPersistent, Guid), + MSCStateN, IsPersistent, Guid), StateN2 = StateN1 #vqstate { msg_store_clients = MSCStateN1 }, {_SeqId, StateN3} = publish(Msg, true, true, StateN2), - {SeqIdsAcc1, MsgStore} = - case IsPersistent of - true -> - {[SeqId | SeqIdsAcc], PersistentStore}; - false -> - {SeqIdsAcc, ?TRANSIENT_MSG_STORE} - end, + MsgStore = find_msg_store(IsPersistent), + SeqIdsAcc1 = case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), - StateN3} + rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3} end end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:release(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, @@ -771,7 +760,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, - persistent_store = PersistentStore, index_state = IndexState }) -> {SeqIds, GuidsByStore, PA1} = dict:fold( @@ -783,7 +771,7 @@ remove_pending_ack(KeepPersistent, case IsPersistent of true -> {[SeqId | SeqIdsAcc], rabbit_misc:dict_cons( - PersistentStore, Guid, Dict), PAN1}; + ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1}; false -> {SeqIdsAcc, rabbit_misc:dict_cons( ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1} @@ -952,11 +940,10 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, - persistent_store = PersistentStore, - pending_ack = PA }) -> + pending_ack = PA, durable = IsDurable }) -> %% If we are a non-durable queue, or (no persisent pubs, and no %% persistent acks) then we can skip the queue_index loop. - case PersistentStore == ?TRANSIENT_MSG_STORE orelse + case (not IsDurable) orelse (IsTransientPubs andalso lists:foldl( fun (AckTag, true ) -> @@ -997,12 +984,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, [ Fun() || Fun <- lists:reverse(SFuns) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. -delete1(_PersistentStore, _TransientThreshold, NextSeqId, DeltaSeqId, - IndexState) when DeltaSeqId =:= undefined - orelse DeltaSeqId >= NextSeqId -> +delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState) + when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId -> IndexState; -delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, - IndexState) -> +delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> {List, Again, IndexState1} = rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState), IndexState2 = @@ -1012,38 +997,33 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, List, TransientThreshold, IndexState1), {_Count, IndexState4} = remove_queue_entries( - PersistentStore, fun beta_fold_no_index_on_disk/3, - Q, IndexState3), + fun beta_fold_no_index_on_disk/3, Q, IndexState3), IndexState4 end, - delete1(PersistentStore, TransientThreshold, NextSeqId, Again, - IndexState2). + delete1(TransientThreshold, NextSeqId, Again, IndexState2). -purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, - persistent_store = PersistentStore }) -> +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of true -> {Q1Count, IndexState1} = remove_queue_entries( - PersistentStore, fun rabbit_misc:queue_fold/3, - State #vqstate.q1, IndexState), + fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = remove_queue_entries( - PersistentStore, fun beta_fold_no_index_on_disk/3, - Q3, IndexState), + fun beta_fold_no_index_on_disk/3, Q3, IndexState), purge1(Count + Q3Count, maybe_deltas_to_betas( State #vqstate { index_state = IndexState1, q3 = bpqueue:new() })) end. -remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> - {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} = +remove_queue_entries(Fold, Q, IndexState) -> + {Count, GuidsByStore, SeqIds, IndexState1} = Fold(fun remove_queue_entries1/2, - {PersistentStore, 0, dict:new(), [], IndexState}, Q), + {0, dict:new(), [], IndexState}, Q), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), @@ -1058,11 +1038,11 @@ remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> + {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> GuidsByStore1 = case {MsgOnDisk, IsPersistent} of {true, true} -> - rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore); + rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore); {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore); {false, _} -> @@ -1076,15 +1056,14 @@ remove_queue_entries1( true -> rabbit_queue_index:deliver(SeqId, IndexStateN); false -> IndexStateN end, - {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. + {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + msg_store_clients = MSCState }) -> case bpqueue:out(Q3) of {empty, _Q3} -> 0 = DeltaCount, %% ASSERTION @@ -1096,8 +1075,7 @@ fetch_from_q3_or_delta(State = #vqstate { is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }}, MSCState1} = - read_from_msg_store( - PersistentStore, MSCState, IsPersistent, Guid), + read_from_msg_store(MSCState, IsPersistent, Guid), Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = case IndexOnDisk of true -> RamIndexCount; @@ -1201,10 +1179,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, publish(msg, MsgStatus, #vqstate { index_state = IndexState, ram_msg_count = RamMsgCount, - msg_store_clients = MSCState, - persistent_store = PersistentStore } = State) -> + msg_store_clients = MSCState } = State) -> {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), + maybe_write_msg_to_disk(false, MsgStatus, MSCState), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, @@ -1213,11 +1190,10 @@ publish(msg, MsgStatus, #vqstate { store_alpha_entry(MsgStatus2, State1); publish(index, MsgStatus, #vqstate { - index_state = IndexState, q1 = Q1, ram_index_count = RamIndexCount, msg_store_clients = MSCState, - persistent_store = PersistentStore } = State) -> + index_state = IndexState, q1 = Q1 } = State) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), @@ -1233,10 +1209,9 @@ publish(index, MsgStatus, #vqstate { publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + delta = Delta, msg_store_clients = MSCState }) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus1, IndexState), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION @@ -1271,42 +1246,39 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } end. -find_msg_store(true, PersistentStore) -> PersistentStore; -find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE. +find_msg_store(true) -> ?PERSISTENT_MSG_STORE; +find_msg_store(false) -> ?TRANSIENT_MSG_STORE. -with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true, - Fun) -> - {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP), +with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false, - Fun) -> +with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), {Result, {MSCStateP, {MSCStateT1, TRef}}}. -read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) -> +read_from_msg_store(MSCState, IsPersistent, Guid) -> with_msg_store_state( - PersistentStore, MSCState, IsPersistent, + MSCState, IsPersistent, fun (MsgStore, MSCState1) -> rabbit_msg_store:read(MsgStore, Guid, MSCState1) end). -maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = +maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> {MsgStatus, MSCState}; -maybe_write_msg_to_disk(PersistentStore, Force, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, - is_persistent = IsPersistent }, MSCState) +maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, guid = Guid, + is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> {ok, MSCState1} = with_msg_store_state( - PersistentStore, MSCState, IsPersistent, + MSCState, IsPersistent, fun (MsgStore, MSCState2) -> rabbit_msg_store:write( MsgStore, Guid, ensure_binary_properties(Msg), MSCState2) end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) -> +maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. maybe_write_index_to_disk(_Force, MsgStatus = @@ -1465,14 +1437,12 @@ maybe_push_alphas_to_betas( maybe_push_alphas_to_betas( Generator, Consumer, Q, State = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - index_state = IndexState, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + index_state = IndexState, msg_store_clients = MSCState }) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk( - PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), |
