diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-03 20:56:46 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-03 20:56:46 +0100 |
| commit | ca1181e79d77b89ead1bb210d5744e6301cd2000 (patch) | |
| tree | 41b4b60803f34aad62e25d8a919094ceca6d76d2 /src | |
| parent | a133685fb5097a6c5fbef9a1c6afb9a78fdbf069 (diff) | |
| download | rabbitmq-server-git-ca1181e79d77b89ead1bb210d5744e6301cd2000.tar.gz | |
Introduced rabbit_misc:dict_cons/3 which ends up being used in 3 places. Also fixed a bug which I'd sleepily introduced in vq:requeue where a msg_store:release had accidentally become a msg_store:remove (no idea how the tests managed to pass after that enough to convince me to commit - certainly had the tests failing today due to that one). Finally, persistent msgs in a non-durable queue should be sent to the transient msg_store, not the persistent msg_store. Thus they will survive a crash of the queue, but not a restart of the server.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 288 |
6 files changed, 166 insertions, 149 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1394f9db75..c9add5b2d5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -623,10 +623,14 @@ i(Item, _) -> handle_call(init_variable_queue, From, State = #q{variable_queue_state = undefined, - q = #amqqueue{name = QName}}) -> + q = #amqqueue{name = QName, durable = IsDurable}}) -> gen_server2:reply(From, ok), - noreply( - State #q { variable_queue_state = rabbit_variable_queue:init(QName) }); + PersistentStore = case IsDurable of + true -> ?PERSISTENT_MSG_STORE; + false -> ?TRANSIENT_MSG_STORE + end, + noreply(State #q { variable_queue_state = + rabbit_variable_queue:init(QName, PersistentStore) }); handle_call(init_variable_queue, _From, State) -> reply(ok, State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index be120c2ec8..c8733ed197 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -970,10 +970,7 @@ fold_per_queue(F, Acc0, UAQ) -> %% lists:reverse in handle_message({recover, true}, %% ...). However, it is significantly slower when %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3bc35ca2be..cd2e7fbc83 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -59,7 +59,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([recursive_delete/1]). +-export([recursive_delete/1, dict_cons/3]). -import(mnesia). -import(lists). @@ -135,6 +135,7 @@ -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). -spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -endif. @@ -625,3 +626,6 @@ recursive_delete(Path) -> {error, {Path, Error}} end end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 884ea4ab5c..96337b42e7 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -78,9 +78,7 @@ deliver(QPids, Delivery) -> dict:to_list( lists:foldl( fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) + rabbit_misc:dict_cons(node(QPid), QPid, D) end, dict:new(), QPids)), Delivery). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 474afbcafb..75c66693e3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1363,7 +1363,7 @@ assert_prop(List, Prop, Value) -> fresh_variable_queue() -> stop_msg_store(), ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue()), + VQ = rabbit_variable_queue:init(test_queue(), ?PERSISTENT_MSG_STORE), S0 = rabbit_variable_queue:status(VQ), assert_prop(S0, len, 0), assert_prop(S0, q1, 0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b9714f535b..37c6b22ef8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_variable_queue). --export([init/1, terminate/1, publish/2, publish_delivered/2, +-export([init/2, terminate/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_rates/1, ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2, @@ -154,7 +154,8 @@ rate_timestamp, len, on_sync, - msg_store_clients + msg_store_clients, + persistent_store }). -include("rabbit.hrl"). @@ -186,7 +187,7 @@ -type(bpqueue() :: any()). -type(msg_id() :: binary()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), boolean()} +-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()} | 'ack_not_on_disk'). -type(vqstate() :: #vqstate { q1 :: queue(), @@ -210,10 +211,11 @@ rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, - msg_store_clients :: {any(), any()} + msg_store_clients :: {any(), any()}, + persistent_store :: pid() | atom() }). --spec(init/1 :: (queue_name()) -> vqstate()). +-spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()). -spec(terminate/1 :: (vqstate()) -> vqstate()). -spec(publish/2 :: (basic_message(), vqstate()) -> {seq_id(), vqstate()}). @@ -253,7 +255,7 @@ %% Public API %%---------------------------------------------------------------------------- -init(QueueName) -> +init(QueueName, PersistentStore) -> {DeltaCount, IndexState} = rabbit_queue_index:init(QueueName), {DeltaSeqId, NextSeqId, IndexState1} = @@ -285,9 +287,10 @@ init(QueueName) -> rate_timestamp = Now, len = DeltaCount, on_sync = {[], [], []}, - msg_store_clients = {rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)} - }, + msg_store_clients = {rabbit_msg_store:client_init(PersistentStore), + rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)}, + persistent_store = PersistentStore + }, maybe_deltas_to_betas(State). terminate(State = #vqstate { index_state = IndexState, @@ -306,19 +309,22 @@ publish_delivered(Msg = #basic_message { guid = MsgId, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, - msg_store_clients = MSCState }) -> + msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), + {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, + MsgStatus, MSCState), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), - {{ack_index_and_store, MsgId, SeqId, IsPersistent}, + {{ack_index_and_store, MsgId, SeqId, + find_msg_store(IsPersistent, PersistentStore)}, State2 #vqstate { index_state = IndexState1, next_seq_id = SeqId + 1 }}; false -> @@ -378,7 +384,8 @@ ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, fetch(State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len }) -> + index_state = IndexState, len = Len, + persistent_store = PersistentStore }) -> case queue:out(Q4) of {empty, _Q4} -> fetch_from_q3_or_delta(State); @@ -387,7 +394,7 @@ fetch(State = is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> - {IndexState1, IndexOnDisk1} = + {IndexState1, IsPersistent} = case IndexOnDisk of true -> IndexState2 = @@ -404,13 +411,15 @@ fetch(State = false -> {IndexState, false} end, + MsgStore = find_msg_store(IsPersistent, PersistentStore), AckTag = - case IndexOnDisk1 of - true -> true = IsPersistent, %% ASSERTION - true = MsgOnDisk, %% ASSERTION - {ack_index_and_store, MsgId, SeqId, IsPersistent}; - false -> ok = case MsgOnDisk andalso not IsPersistent of - true -> rabbit_msg_store:remove(find_msg_store(IsPersistent), [MsgId]); + case IsPersistent of + true -> true = MsgOnDisk, %% ASSERTION + {ack_index_and_store, MsgId, SeqId, MsgStore}; + false -> ok = case MsgOnDisk of + true -> + rabbit_msg_store:remove( + MsgStore, [MsgId]); false -> ok end, ack_not_on_disk @@ -423,26 +432,19 @@ fetch(State = end. ack(AckTags, State = #vqstate { index_state = IndexState }) -> - {MsgIdsPersistent, MsgIdsTransient, SeqIds} = + {MsgIdsByStore, SeqIds} = lists:foldl( fun (ack_not_on_disk, Acc) -> Acc; - ({ack_index_and_store, MsgId, SeqId, true}, {MsgIdsP, MsgIdsT, SeqIds}) -> - {[MsgId | MsgIdsP], MsgIdsT, [SeqId | SeqIds]}; - ({ack_index_and_store, MsgId, SeqId, false}, {MsgIdsP, MsgIdsT, SeqIds}) -> - {MsgIdsP, [MsgId | MsgIdsT], [SeqId | SeqIds]} - end, {[], [], []}, AckTags), + ({ack_index_and_store, MsgId, SeqId, MsgStore}, {Dict, SeqIds}) -> + {rabbit_misc:dict_cons(MsgStore, MsgId, Dict), [SeqId | SeqIds]} + end, {dict:new(), []}, AckTags), IndexState1 = case SeqIds of [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = case MsgIdsPersistent of - [] -> ok; - _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent) - end, - ok = case MsgIdsTransient of - [] -> ok; - _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient) - end, + ok = dict:fold(fun (MsgStore, MsgIds, ok) -> + rabbit_msg_store:remove(MsgStore, MsgIds) + end, ok, MsgIdsByStore), State #vqstate { index_state = IndexState1 }. len(#vqstate { len = Len }) -> @@ -451,9 +453,11 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). -purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, + persistent_store = PersistentStore }) -> {Q4Count, IndexState1} = - remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), + remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3, + Q4, IndexState), {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), @@ -463,7 +467,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> {_PurgeCount, State1 = #vqstate { index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} }} = + msg_store_clients = {MSCStateP, MSCStateT}, + persistent_store = PersistentStore }} = purge(State), IndexState1 = case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id( @@ -472,7 +477,8 @@ delete_and_terminate(State) -> IndexState2; {DeltaSeqId, NextSeqId, IndexState2} -> {_DeleteCount, IndexState3} = - delete1(NextSeqId, 0, DeltaSeqId, IndexState2), + delete1(PersistentStore, NextSeqId, 0, DeltaSeqId, + IndexState2), IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), @@ -490,64 +496,59 @@ delete_and_terminate(State) -> %% msg_store:release so that the cache isn't held full of msgs which %% are now at the tail of the queue. requeue(MsgsWithAckTags, State) -> - {SeqIds, MsgIdsPersistent, MsgIdsTransient, + {SeqIds, MsgIdsByStore, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, AckTag}, - {SeqIdsAcc, MsgIdsP, MsgIdsT, StateN}) -> - {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, MsgOnDisk} = + {SeqIdsAcc, Dict, StateN}) -> + {SeqIdsAcc1, Dict1, MsgOnDisk} = case AckTag of ack_not_on_disk -> - {SeqIdsAcc, MsgIdsP, MsgIdsT, false}; - {ack_index_and_store, MsgId, SeqId, true} -> - {[SeqId | SeqIdsAcc], [MsgId | MsgIdsP], MsgIdsT, true}; - {ack_index_and_store, MsgId, SeqId, false} -> - {[SeqId | SeqIdsAcc], MsgIdsP, [MsgId | MsgIdsT], true} + {SeqIdsAcc, Dict, false}; + {ack_index_and_store, MsgId, SeqId, MsgStore} -> + {[SeqId | SeqIdsAcc], + rabbit_misc:dict_cons(MsgStore, MsgId, Dict), + true} end, {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN), - {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, StateN1} - end, {[], [], [], State}, MsgsWithAckTags), + {SeqIdsAcc1, Dict1, StateN1} + end, {[], dict:new(), State}, MsgsWithAckTags), IndexState1 = case SeqIds of [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = case MsgIdsPersistent of - [] -> ok; - _ -> rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIdsPersistent) - end, - ok = case MsgIdsTransient of - [] -> ok; - _ -> rabbit_msg_store:release(?TRANSIENT_MSG_STORE, MsgIdsTransient) - end, + ok = dict:fold(fun (MsgStore, MsgIds, ok) -> + rabbit_msg_store:release(MsgStore, MsgIds) + end, ok, MsgIdsByStore), State1 #vqstate { index_state = IndexState1 }. tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, - State = #vqstate { msg_store_clients = MSCState }) -> + State = #vqstate { msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> MsgStatus = #msg_status { msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, {#msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), + maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; tx_publish(_Msg, State) -> State. -tx_rollback(Pubs, State) -> +tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) -> ok = case persistent_msg_ids(Pubs) of [] -> ok; - PP -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, PP) + PP -> rabbit_msg_store:remove(PersistentStore, PP) end, State. -tx_commit(Pubs, AckTags, From, State) -> +tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) -> case persistent_msg_ids(Pubs) of [] -> {true, tx_commit_from_msg_store(Pubs, AckTags, From, State)}; PersistentMsgIds -> Self = self(), ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, - PersistentMsgIds, + PersistentStore, PersistentMsgIds, fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback( Self, Pubs, AckTags, From) end), @@ -696,51 +697,53 @@ should_force_index_to_disk(State = %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -delete1(NextSeqId, Count, DeltaSeqId, IndexState) +delete1(_PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) when DeltaSeqId >= NextSeqId -> {Count, IndexState}; -delete1(NextSeqId, Count, DeltaSeqId, IndexState) -> +delete1(PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) -> Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(), case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of {[], IndexState1} -> - delete1(NextSeqId, Count, Delta1SeqId, IndexState1); + delete1(PersistentStore, NextSeqId, Count, Delta1SeqId, + IndexState1); {List, IndexState1} -> Q = betas_from_segment_entries(List, Delta1SeqId), {QCount, IndexState2} = - remove_queue_entries(fun beta_fold_no_index_on_disk/3, - Q, IndexState1), - delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2) + remove_queue_entries( + PersistentStore, fun beta_fold_no_index_on_disk/3, + Q, IndexState1), + delete1(PersistentStore, NextSeqId, Count + QCount, Delta1SeqId, + IndexState2) end. -purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> +purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, + persistent_store = PersistentStore }) -> case bpqueue:is_empty(Q3) of true -> {Q1Count, IndexState1} = - remove_queue_entries(fun rabbit_misc:queue_fold/3, - State #vqstate.q1, IndexState), + remove_queue_entries( + PersistentStore, fun rabbit_misc:queue_fold/3, + State #vqstate.q1, IndexState), {Count + Q1Count, State #vqstate { q1 = queue:new(), index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = - remove_queue_entries(fun beta_fold_no_index_on_disk/3, - Q3, IndexState), + remove_queue_entries( + PersistentStore, fun beta_fold_no_index_on_disk/3, + Q3, IndexState), purge1(Count + Q3Count, maybe_deltas_to_betas( State #vqstate { index_state = IndexState1, q3 = bpqueue:new() })) end. -remove_queue_entries(Fold, Q, IndexState) -> - {Count, MsgIdsPersistent, MsgIdsTransient, SeqIds, IndexState1} = - Fold(fun remove_queue_entries1/2, {0, [], [], [], IndexState}, Q), - ok = case MsgIdsPersistent of - [] -> ok; - _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent) - end, - ok = case MsgIdsTransient of - [] -> ok; - _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient) - end, +remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> + {_PersistentStore, Count, MsgIdsByStore, SeqIds, IndexState1} = + Fold(fun remove_queue_entries1/2, + {PersistentStore, 0, dict:new(), [], IndexState}, Q), + ok = dict:fold(fun (MsgStore, MsgIds, ok) -> + rabbit_msg_store:remove(MsgStore, MsgIds) + end, ok, MsgIdsByStore), IndexState2 = case SeqIds of [] -> IndexState1; @@ -752,12 +755,15 @@ remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {CountN, MsgIdsP, MsgIdsT, SeqIdsAcc, IndexStateN}) -> - {MsgIdsP1, MsgIdsT1} = + {PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) -> + MsgIdsByStore1 = case {MsgOnDisk, IsPersistent} of - {true, true} -> {[MsgId | MsgIdsP], MsgIdsT}; - {true, false} -> {MsgIdsP, [MsgId | MsgIdsT]}; - {false, _} -> {MsgIdsP, MsgIdsT} + {true, true} -> + rabbit_misc:dict_cons(PersistentStore, MsgId, MsgIdsByStore); + {true, false} -> + rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, MsgId, MsgIdsByStore); + {false, _} -> + MsgIdsByStore end, SeqIdsAcc1 = case IndexOnDisk of true -> [SeqId | SeqIdsAcc]; @@ -768,13 +774,14 @@ remove_queue_entries1( SeqId, IndexStateN); false -> IndexStateN end, - {CountN + 1, MsgIdsP1, MsgIdsT1, SeqIdsAcc1, IndexStateN1}. + {PersistentStore, CountN + 1, MsgIdsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> + msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> case bpqueue:out(Q3) of {empty, _Q3} -> 0 = DeltaCount, %% ASSERTION @@ -786,7 +793,8 @@ fetch_from_q3_or_delta(State = #vqstate { is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }}, MSCState1} = - read_from_msg_store(MSCState, MsgId, IsPersistent), + read_from_msg_store( + PersistentStore, MSCState, IsPersistent, MsgId), Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = case IndexOnDisk of true -> RamIndexCount; @@ -881,11 +889,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, State #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1 })}. -publish(msg, MsgStatus, State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount, - msg_store_clients = MSCState }) -> +publish(msg, MsgStatus, #vqstate { + index_state = IndexState, ram_msg_count = RamMsgCount, + msg_store_clients = MSCState, + persistent_store = PersistentStore } = State) -> {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), + maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, @@ -893,11 +902,12 @@ publish(msg, MsgStatus, State = #vqstate { index_state = IndexState, msg_store_clients = MSCState1 }, store_alpha_entry(MsgStatus2, State1); -publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1, - ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> +publish(index, MsgStatus, #vqstate { + index_state = IndexState, q1 = Q1, + ram_index_count = RamIndexCount, msg_store_clients = MSCState, + persistent_store = PersistentStore } = State) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(true, MsgStatus, MSCState), + maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), @@ -913,9 +923,10 @@ publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1, publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta, msg_store_clients = MSCState }) -> + delta = Delta, msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(true, MsgStatus, MSCState), + maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(true, MsgStatus1, IndexState), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION @@ -955,41 +966,42 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } end. -find_msg_store(true) -> ?PERSISTENT_MSG_STORE; -find_msg_store(false) -> ?TRANSIENT_MSG_STORE. - -read_from_msg_store({MSCStateP, MSCStateT}, MsgId, true) -> - {Res, MSCStateP1} = - rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateP), - {Res, {MSCStateP1, MSCStateT}}; -read_from_msg_store({MSCStateP, MSCStateT}, MsgId, false) -> - {Res, MSCStateT1} = - rabbit_msg_store:read(?TRANSIENT_MSG_STORE, MsgId, MSCStateT), - {Res, {MSCStateP, MSCStateT1}}. - -maybe_write_msg_to_disk(_Force, MsgStatus = +find_msg_store(true, PersistentStore) -> PersistentStore; +find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE. + +with_msg_store_state(PersistentStore, {MSCStateP, MSCStateT}, true, + Fun) -> + {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP), + {Result, {MSCStateP1, MSCStateT}}; +with_msg_store_state(_PersistentStore, {MSCStateP, MSCStateT}, false, + Fun) -> + {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), + {Result, {MSCStateP, MSCStateT1}}. + +read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) -> + with_msg_store_state( + PersistentStore, MSCState, IsPersistent, + fun (MsgStore, MSCState1) -> + rabbit_msg_store:read(MsgStore, MsgId, MSCState1) + end). + +maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> {MsgStatus, MSCState}; -maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, - {MSCStateP, MSCStateT}) +maybe_write_msg_to_disk(PersistentStore, Force, + MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, + is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - MSCState1 = - case IsPersistent of - true -> - {ok, MSCStateP1} = rabbit_msg_store:write( - ?PERSISTENT_MSG_STORE, MsgId, - ensure_binary_properties(Msg), MSCStateP), - {MSCStateP1, MSCStateT}; - false -> - {ok, MSCStateT1} = rabbit_msg_store:write( - ?TRANSIENT_MSG_STORE, MsgId, - ensure_binary_properties(Msg), MSCStateT), - {MSCStateP, MSCStateT1} - end, + {ok, MSCState1} = + with_msg_store_state( + PersistentStore, MSCState, IsPersistent, + fun (MsgStore, MSCState2) -> + rabbit_msg_store:write( + MsgStore, MsgId, ensure_binary_properties(Msg), MSCState2) + end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; -maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> +maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. maybe_write_index_to_disk(_Force, MsgStatus = @@ -1137,12 +1149,14 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = maybe_push_alphas_to_betas( Generator, Consumer, Q, State = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - index_state = IndexState, msg_store_clients = MSCState }) -> + index_state = IndexState, msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> - {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(true, MsgStatus, - MSCState), + {MsgStatus1, MSCState1} = + maybe_write_msg_to_disk( + PersistentStore, true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), {MsgStatus2, IndexState1} = maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), |
