diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-25 11:20:38 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-05-25 11:20:38 +0100 |
| commit | 9220fbd071539d20dcd1df1fd7d32901060a16af (patch) | |
| tree | 20023fd013e8e5e17ff46ce6f232c8a3ff5d45c5 /src | |
| parent | b54129b04098ec48124f635ba42d68db8807af88 (diff) | |
| parent | c6aedf1bb07b0d651074655d9c934aed8b81d1c9 (diff) | |
| download | rabbitmq-server-git-9220fbd071539d20dcd1df1fd7d32901060a16af.tar.gz | |
Merging bug 22752(bug21673) onto bug 21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 223 |
1 files changed, 103 insertions, 120 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3d485106d3..88c2719a36 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -189,10 +189,10 @@ len, on_sync, msg_store_clients, - persistent_store, persistent_count, transient_threshold, - pending_ack + pending_ack, + durable }). -record(msg_status, @@ -265,10 +265,10 @@ [fun (() -> any())]}, msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer(), - pending_ack :: dict() + pending_ack :: dict(), + durable :: boolean() }). -include("rabbit_backing_queue_spec.hrl"). @@ -302,12 +302,8 @@ start(DurableQueues) -> Refs, StartFunState]). init(QueueName, IsDurable, _Recover) -> - PersistentStore = case IsDurable of - true -> ?PERSISTENT_MSG_STORE; - false -> ?TRANSIENT_MSG_STORE - end, MsgStoreRecovered = - rabbit_msg_store:successfully_recovered_state(PersistentStore), + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), ContainsCheckFun = fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) @@ -331,7 +327,11 @@ init(QueueName, IsDurable, _Recover) -> end_seq_id = NextSeqId } end, Now = now(), - PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef), + PersistentClient = + case IsDurable of + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + false -> undefined + end, TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -357,10 +357,10 @@ init(QueueName, IsDurable, _Recover) -> on_sync = {[], [], []}, msg_store_clients = {{PersistentClient, PRef}, {TransientClient, TRef}}, - persistent_store = PersistentStore, persistent_count = DeltaCount1, transient_threshold = NextSeqId, - pending_ack = dict:new() + pending_ack = dict:new(), + durable = IsDurable }, maybe_deltas_to_betas(State). @@ -385,7 +385,6 @@ delete_and_terminate(State) -> State2 = #vqstate { index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, - persistent_store = PersistentStore, transient_threshold = TransientThreshold } = remove_pending_ack(false, State1), %% flushing here is good because it deletes all full segments, @@ -396,21 +395,22 @@ delete_and_terminate(State) -> {N, N, IndexState3} -> IndexState3; {DeltaSeqId, NextSeqId, IndexState3} -> - delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, IndexState3) + delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState3) end, IndexState5 = rabbit_queue_index:delete_and_terminate(IndexState2), - rabbit_msg_store:delete_client(PersistentStore, PRef), + case MSCStateP of + undefined -> ok; + _ -> rabbit_msg_store:delete_client(?PERSISTENT_MSG_STORE, PRef), + rabbit_msg_store:client_terminate(MSCStateP) + end, rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), - rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), State2 #vqstate { index_state = IndexState5, msg_store_clients = undefined }. -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, - persistent_store = PersistentStore }) -> +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = - remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3, - Q4, IndexState), + remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), @@ -431,16 +431,17 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, out_counter = OutCount, in_counter = InCount, msg_store_clients = MSCState, - persistent_store = PersistentStore, persistent_count = PCount, - pending_ack = PA }) -> + pending_ack = PA, + durable = IsDurable }) -> + IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, - MsgStatus, MSCState), + {MsgStatus1, MSCState1} = + maybe_write_msg_to_disk(false, MsgStatus, MSCState), State1 = State #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount + case IsPersistent of + persistent_count = PCount + case IsPersistent1 of true -> 1; false -> 0 end, @@ -462,7 +463,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len, persistent_count = PCount, - persistent_store = PersistentStore, pending_ack = PA }) -> + pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_or_delta(State) of @@ -488,7 +489,7 @@ fetch(AckRequired, State = end, %% 2. If it's on disk and there's no Ack required, remove it - MsgStore = find_msg_store(IsPersistent, PersistentStore), + MsgStore = find_msg_store(IsPersistent), IndexState2 = case MsgOnDisk andalso not AckRequired of %% Remove from disk now @@ -542,7 +543,6 @@ ack([], State) -> State; ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, - persistent_store = PersistentStore, pending_ack = PA }) -> {GuidsByStore, SeqIds, PA1} = lists:foldl( @@ -553,19 +553,20 @@ ack(AckTags, State = #vqstate { index_state = IndexState, msg_on_disk = false, is_persistent = false }} -> {Dict, SeqIds, PAN1}; - {ok, {false, Guid}} -> - {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, - Dict), SeqIds, PAN1}; - {ok, {true, Guid}} -> - {rabbit_misc:dict_cons(PersistentStore, Guid, Dict), - [SeqId | SeqIds], PAN1} + {ok, {IsPersistent, Guid}} -> + SeqIds1 = case IsPersistent of + true -> [SeqId | SeqIds]; + false -> SeqIds + end, + {rabbit_misc:dict_cons(find_msg_store(IsPersistent), + Guid, Dict), SeqIds1, PAN1} end end, {dict:new(), [], PA}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, @@ -575,12 +576,12 @@ ack(AckTags, State = #vqstate { index_state = IndexState, tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, State = #vqstate { msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + durable = true }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), + maybe_write_msg_to_disk(false, MsgStatus, MSCState), publish_in_tx(Txn, Msg), State #vqstate { msg_store_clients = MSCState1 }; tx_publish(Txn, Msg, State) -> @@ -591,13 +592,17 @@ tx_ack(Txn, AckTags, State) -> ack_in_tx(Txn, AckTags), State. -tx_rollback(Txn, State = #vqstate { persistent_store = PersistentStore }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - ok = rabbit_msg_store:remove(PersistentStore, persistent_guids(Pubs)), + ok = case IsDurable of + true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, + persistent_guids(Pubs)); + false -> ok + end, {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> +tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), @@ -607,8 +612,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> PersistentGuids = persistent_guids(PubsOrdered), IsTransientPubs = [] == PersistentGuids, {AckTags1, - case IsTransientPubs orelse - ?TRANSIENT_MSG_STORE == PersistentStore of + case (not IsDurable) orelse IsTransientPubs of true -> tx_commit_post_msg_store( IsTransientPubs, PubsOrdered, AckTags1, Fun, State); @@ -620,7 +624,7 @@ tx_commit(Txn, Fun, State = #vqstate { persistent_store = PersistentStore }) -> State end}. -requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> +requeue(AckTags, State) -> {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = @@ -641,27 +645,24 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> {ok, {IsPersistent, Guid}} -> {{ok, Msg = #basic_message{}}, MSCStateN1} = read_from_msg_store( - PersistentStore, MSCStateN, IsPersistent, Guid), + MSCStateN, IsPersistent, Guid), StateN2 = StateN1 #vqstate { msg_store_clients = MSCStateN1 }, {_SeqId, StateN3} = publish(Msg, true, true, StateN2), - {SeqIdsAcc1, MsgStore} = - case IsPersistent of - true -> - {[SeqId | SeqIdsAcc], PersistentStore}; - false -> - {SeqIdsAcc, ?TRANSIENT_MSG_STORE} - end, + MsgStore = find_msg_store(IsPersistent), + SeqIdsAcc1 = case IsPersistent of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), - StateN3} + rabbit_misc:dict_cons(MsgStore, Guid, Dict), StateN3} end end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:release(MsgStore, Guids) end, ok, GuidsByStore), - PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of + PCount1 = PCount - case dict:find(?PERSISTENT_MSG_STORE, GuidsByStore) of error -> 0; {ok, Guids} -> length(Guids) end, @@ -759,7 +760,6 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, - persistent_store = PersistentStore, index_state = IndexState }) -> {SeqIds, GuidsByStore, PA1} = dict:fold( @@ -771,7 +771,7 @@ remove_pending_ack(KeepPersistent, case IsPersistent of true -> {[SeqId | SeqIdsAcc], rabbit_misc:dict_cons( - PersistentStore, Guid, Dict), PAN1}; + ?PERSISTENT_MSG_STORE, Guid, Dict), PAN1}; false -> {SeqIdsAcc, rabbit_misc:dict_cons( ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1} @@ -940,11 +940,10 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, - persistent_store = PersistentStore, - pending_ack = PA }) -> + pending_ack = PA, durable = IsDurable }) -> %% If we are a non-durable queue, or (no persisent pubs, and no %% persistent acks) then we can skip the queue_index loop. - case PersistentStore == ?TRANSIENT_MSG_STORE orelse + case (not IsDurable) orelse (IsTransientPubs andalso lists:foldl( fun (AckTag, true ) -> @@ -965,18 +964,18 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = tx_commit_index(State = #vqstate { on_sync = {_, _, []} }) -> State; tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, - persistent_store = PersistentStore }) -> + durable = IsDurable }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), - IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, StateN}) -> {SeqId, StateN1} = - publish(Msg, false, IsPersistent, StateN), - {case IsPersistentStore andalso IsPersistent of + publish(Msg, false, IsDurable andalso IsPersistent, + StateN), + {case IsDurable andalso IsPersistent of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, StateN1} @@ -985,12 +984,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, [ Fun() || Fun <- lists:reverse(SFuns) ], State2 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. -delete1(_PersistentStore, _TransientThreshold, NextSeqId, DeltaSeqId, - IndexState) when DeltaSeqId =:= undefined - orelse DeltaSeqId >= NextSeqId -> +delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState) + when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId -> IndexState; -delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, - IndexState) -> +delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> {List, Again, IndexState1} = rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState), IndexState2 = @@ -1000,38 +997,33 @@ delete1(PersistentStore, TransientThreshold, NextSeqId, DeltaSeqId, List, TransientThreshold, IndexState1), {_Count, IndexState4} = remove_queue_entries( - PersistentStore, fun beta_fold_no_index_on_disk/3, - Q, IndexState3), + fun beta_fold_no_index_on_disk/3, Q, IndexState3), IndexState4 end, - delete1(PersistentStore, TransientThreshold, NextSeqId, Again, - IndexState2). + delete1(TransientThreshold, NextSeqId, Again, IndexState2). -purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, - persistent_store = PersistentStore }) -> +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> case bpqueue:is_empty(Q3) of true -> {Q1Count, IndexState1} = remove_queue_entries( - PersistentStore, fun rabbit_misc:queue_fold/3, - State #vqstate.q1, IndexState), + fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = remove_queue_entries( - PersistentStore, fun beta_fold_no_index_on_disk/3, - Q3, IndexState), + fun beta_fold_no_index_on_disk/3, Q3, IndexState), purge1(Count + Q3Count, maybe_deltas_to_betas( State #vqstate { index_state = IndexState1, q3 = bpqueue:new() })) end. -remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> - {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} = +remove_queue_entries(Fold, Q, IndexState) -> + {Count, GuidsByStore, SeqIds, IndexState1} = Fold(fun remove_queue_entries1/2, - {PersistentStore, 0, dict:new(), [], IndexState}, Q), + {0, dict:new(), [], IndexState}, Q), ok = dict:fold(fun (MsgStore, Guids, ok) -> rabbit_msg_store:remove(MsgStore, Guids) end, ok, GuidsByStore), @@ -1046,11 +1038,11 @@ remove_queue_entries1( #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> + {CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> GuidsByStore1 = case {MsgOnDisk, IsPersistent} of {true, true} -> - rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore); + rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore); {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore); {false, _} -> @@ -1064,15 +1056,14 @@ remove_queue_entries1( true -> rabbit_queue_index:deliver(SeqId, IndexStateN); false -> IndexStateN end, - {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. + {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + msg_store_clients = MSCState }) -> case bpqueue:out(Q3) of {empty, _Q3} -> 0 = DeltaCount, %% ASSERTION @@ -1084,8 +1075,7 @@ fetch_from_q3_or_delta(State = #vqstate { is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }}, MSCState1} = - read_from_msg_store( - PersistentStore, MSCState, IsPersistent, Guid), + read_from_msg_store(MSCState, IsPersistent, Guid), Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = case IndexOnDisk of true -> RamIndexCount; @@ -1172,9 +1162,10 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, IsDelivered, MsgOnDisk, State = #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, - persistent_count = PCount }) -> + persistent_count = PCount, durable = IsDurable }) -> MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsDurable andalso IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = false }, PCount1 = PCount + case IsPersistent of @@ -1188,10 +1179,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, publish(msg, MsgStatus, #vqstate { index_state = IndexState, ram_msg_count = RamMsgCount, - msg_store_clients = MSCState, - persistent_store = PersistentStore } = State) -> + msg_store_clients = MSCState } = State) -> {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), + maybe_write_msg_to_disk(false, MsgStatus, MSCState), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, @@ -1200,11 +1190,10 @@ publish(msg, MsgStatus, #vqstate { store_alpha_entry(MsgStatus2, State1); publish(index, MsgStatus, #vqstate { - index_state = IndexState, q1 = Q1, ram_index_count = RamIndexCount, msg_store_clients = MSCState, - persistent_store = PersistentStore } = State) -> + index_state = IndexState, q1 = Q1 } = State) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), @@ -1220,10 +1209,9 @@ publish(index, MsgStatus, #vqstate { publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + delta = Delta, msg_store_clients = MSCState }) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus1, IndexState), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION @@ -1258,42 +1246,39 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } end. -find_msg_store(true, PersistentStore) -> PersistentStore; -find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE. +find_msg_store(true) -> ?PERSISTENT_MSG_STORE; +find_msg_store(false) -> ?TRANSIENT_MSG_STORE. -with_msg_store_state(PersistentStore, {{MSCStateP, PRef}, MSCStateT}, true, - Fun) -> - {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP), +with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false, - Fun) -> +with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), {Result, {MSCStateP, {MSCStateT1, TRef}}}. -read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) -> +read_from_msg_store(MSCState, IsPersistent, Guid) -> with_msg_store_state( - PersistentStore, MSCState, IsPersistent, + MSCState, IsPersistent, fun (MsgStore, MSCState1) -> rabbit_msg_store:read(MsgStore, Guid, MSCState1) end). -maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = +maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> {MsgStatus, MSCState}; -maybe_write_msg_to_disk(PersistentStore, Force, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, - is_persistent = IsPersistent }, MSCState) +maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, guid = Guid, + is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> {ok, MSCState1} = with_msg_store_state( - PersistentStore, MSCState, IsPersistent, + MSCState, IsPersistent, fun (MsgStore, MSCState2) -> rabbit_msg_store:write( MsgStore, Guid, ensure_binary_properties(Msg), MSCState2) end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) -> +maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. maybe_write_index_to_disk(_Force, MsgStatus = @@ -1452,14 +1437,12 @@ maybe_push_alphas_to_betas( maybe_push_alphas_to_betas( Generator, Consumer, Q, State = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - index_state = IndexState, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + index_state = IndexState, msg_store_clients = MSCState }) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk( - PersistentStore, true, MsgStatus, MSCState), + maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), |
