summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-03 20:56:46 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-03 20:56:46 +0100
commitca1181e79d77b89ead1bb210d5744e6301cd2000 (patch)
tree41b4b60803f34aad62e25d8a919094ceca6d76d2 /src
parenta133685fb5097a6c5fbef9a1c6afb9a78fdbf069 (diff)
downloadrabbitmq-server-git-ca1181e79d77b89ead1bb210d5744e6301cd2000.tar.gz
Introduced rabbit_misc:dict_cons/3 which ends up being used in 3 places. Also fixed a bug which I'd sleepily introduced in vq:requeue where a msg_store:release had accidentally become a msg_store:remove (no idea how the tests managed to pass after that enough to convince me to commit - certainly had the tests failing today due to that one). Finally, persistent msgs in a non-durable queue should be sent to the transient msg_store, not the persistent msg_store. Thus they will survive a crash of the queue, but not a restart of the server.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl288
6 files changed, 166 insertions, 149 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1394f9db75..c9add5b2d5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -623,10 +623,14 @@ i(Item, _) ->
handle_call(init_variable_queue, From, State =
#q{variable_queue_state = undefined,
- q = #amqqueue{name = QName}}) ->
+ q = #amqqueue{name = QName, durable = IsDurable}}) ->
gen_server2:reply(From, ok),
- noreply(
- State #q { variable_queue_state = rabbit_variable_queue:init(QName) });
+ PersistentStore = case IsDurable of
+ true -> ?PERSISTENT_MSG_STORE;
+ false -> ?TRANSIENT_MSG_STORE
+ end,
+ noreply(State #q { variable_queue_state =
+ rabbit_variable_queue:init(QName, PersistentStore) });
handle_call(init_variable_queue, _From, State) ->
reply(ok, State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index be120c2ec8..c8733ed197 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -970,10 +970,7 @@ fold_per_queue(F, Acc0, UAQ) ->
%% lists:reverse in handle_message({recover, true},
%% ...). However, it is significantly slower when
%% going beyond a few thousand elements.
- dict:update(QPid,
- fun (MsgIds) -> [MsgId | MsgIds] end,
- [MsgId],
- D)
+ rabbit_misc:dict_cons(QPid, MsgId, D)
end, dict:new(), UAQ),
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3bc35ca2be..cd2e7fbc83 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -59,7 +59,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1]).
+-export([recursive_delete/1, dict_cons/3]).
-import(mnesia).
-import(lists).
@@ -135,6 +135,7 @@
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
-spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}).
+-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-endif.
@@ -625,3 +626,6 @@ recursive_delete(Path) ->
{error, {Path, Error}}
end
end.
+
+dict_cons(Key, Value, Dict) ->
+ dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 884ea4ab5c..96337b42e7 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -78,9 +78,7 @@ deliver(QPids, Delivery) ->
dict:to_list(
lists:foldl(
fun (QPid, D) ->
- dict:update(node(QPid),
- fun (QPids1) -> [QPid | QPids1] end,
- [QPid], D)
+ rabbit_misc:dict_cons(node(QPid), QPid, D)
end,
dict:new(), QPids)),
Delivery).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 474afbcafb..75c66693e3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1363,7 +1363,7 @@ assert_prop(List, Prop, Value) ->
fresh_variable_queue() ->
stop_msg_store(),
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue()),
+ VQ = rabbit_variable_queue:init(test_queue(), ?PERSISTENT_MSG_STORE),
S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b9714f535b..37c6b22ef8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/1, terminate/1, publish/2, publish_delivered/2,
+-export([init/2, terminate/1, publish/2, publish_delivered/2,
set_queue_ram_duration_target/2, remeasure_rates/1,
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1,
delete_and_terminate/1, requeue/2, tx_publish/2, tx_rollback/2,
@@ -154,7 +154,8 @@
rate_timestamp,
len,
on_sync,
- msg_store_clients
+ msg_store_clients,
+ persistent_store
}).
-include("rabbit.hrl").
@@ -186,7 +187,7 @@
-type(bpqueue() :: any()).
-type(msg_id() :: binary()).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), boolean()}
+-type(ack() :: {'ack_index_and_store', msg_id(), seq_id(), atom() | pid()}
| 'ack_not_on_disk').
-type(vqstate() :: #vqstate {
q1 :: queue(),
@@ -210,10 +211,11 @@
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]},
- msg_store_clients :: {any(), any()}
+ msg_store_clients :: {any(), any()},
+ persistent_store :: pid() | atom()
}).
--spec(init/1 :: (queue_name()) -> vqstate()).
+-spec(init/2 :: (queue_name(), pid() | atom()) -> vqstate()).
-spec(terminate/1 :: (vqstate()) -> vqstate()).
-spec(publish/2 :: (basic_message(), vqstate()) ->
{seq_id(), vqstate()}).
@@ -253,7 +255,7 @@
%% Public API
%%----------------------------------------------------------------------------
-init(QueueName) ->
+init(QueueName, PersistentStore) ->
{DeltaCount, IndexState} =
rabbit_queue_index:init(QueueName),
{DeltaSeqId, NextSeqId, IndexState1} =
@@ -285,9 +287,10 @@ init(QueueName) ->
rate_timestamp = Now,
len = DeltaCount,
on_sync = {[], [], []},
- msg_store_clients = {rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE),
- rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)}
- },
+ msg_store_clients = {rabbit_msg_store:client_init(PersistentStore),
+ rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE)},
+ persistent_store = PersistentStore
+ },
maybe_deltas_to_betas(State).
terminate(State = #vqstate { index_state = IndexState,
@@ -306,19 +309,22 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
- msg_store_clients = MSCState }) ->
+ msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
+ MsgStatus, MSCState),
State2 = State1 #vqstate { msg_store_clients = MSCState1 },
case MsgStatus1 #msg_status.msg_on_disk of
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- {{ack_index_and_store, MsgId, SeqId, IsPersistent},
+ {{ack_index_and_store, MsgId, SeqId,
+ find_msg_store(IsPersistent, PersistentStore)},
State2 #vqstate { index_state = IndexState1,
next_seq_id = SeqId + 1 }};
false ->
@@ -378,7 +384,8 @@ ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
fetch(State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len }) ->
+ index_state = IndexState, len = Len,
+ persistent_store = PersistentStore }) ->
case queue:out(Q4) of
{empty, _Q4} ->
fetch_from_q3_or_delta(State);
@@ -387,7 +394,7 @@ fetch(State =
is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
- {IndexState1, IndexOnDisk1} =
+ {IndexState1, IsPersistent} =
case IndexOnDisk of
true ->
IndexState2 =
@@ -404,13 +411,15 @@ fetch(State =
false ->
{IndexState, false}
end,
+ MsgStore = find_msg_store(IsPersistent, PersistentStore),
AckTag =
- case IndexOnDisk1 of
- true -> true = IsPersistent, %% ASSERTION
- true = MsgOnDisk, %% ASSERTION
- {ack_index_and_store, MsgId, SeqId, IsPersistent};
- false -> ok = case MsgOnDisk andalso not IsPersistent of
- true -> rabbit_msg_store:remove(find_msg_store(IsPersistent), [MsgId]);
+ case IsPersistent of
+ true -> true = MsgOnDisk, %% ASSERTION
+ {ack_index_and_store, MsgId, SeqId, MsgStore};
+ false -> ok = case MsgOnDisk of
+ true ->
+ rabbit_msg_store:remove(
+ MsgStore, [MsgId]);
false -> ok
end,
ack_not_on_disk
@@ -423,26 +432,19 @@ fetch(State =
end.
ack(AckTags, State = #vqstate { index_state = IndexState }) ->
- {MsgIdsPersistent, MsgIdsTransient, SeqIds} =
+ {MsgIdsByStore, SeqIds} =
lists:foldl(
fun (ack_not_on_disk, Acc) -> Acc;
- ({ack_index_and_store, MsgId, SeqId, true}, {MsgIdsP, MsgIdsT, SeqIds}) ->
- {[MsgId | MsgIdsP], MsgIdsT, [SeqId | SeqIds]};
- ({ack_index_and_store, MsgId, SeqId, false}, {MsgIdsP, MsgIdsT, SeqIds}) ->
- {MsgIdsP, [MsgId | MsgIdsT], [SeqId | SeqIds]}
- end, {[], [], []}, AckTags),
+ ({ack_index_and_store, MsgId, SeqId, MsgStore}, {Dict, SeqIds}) ->
+ {rabbit_misc:dict_cons(MsgStore, MsgId, Dict), [SeqId | SeqIds]}
+ end, {dict:new(), []}, AckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = case MsgIdsPersistent of
- [] -> ok;
- _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
- end,
- ok = case MsgIdsTransient of
- [] -> ok;
- _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient)
- end,
+ ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
+ rabbit_msg_store:remove(MsgStore, MsgIds)
+ end, ok, MsgIdsByStore),
State #vqstate { index_state = IndexState1 }.
len(#vqstate { len = Len }) ->
@@ -451,9 +453,11 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len,
+ persistent_store = PersistentStore }) ->
{Q4Count, IndexState1} =
- remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState),
+ remove_queue_entries(PersistentStore, fun rabbit_misc:queue_fold/3,
+ Q4, IndexState),
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
@@ -463,7 +467,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
{_PurgeCount, State1 = #vqstate { index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} }} =
+ msg_store_clients = {MSCStateP, MSCStateT},
+ persistent_store = PersistentStore }} =
purge(State),
IndexState1 =
case rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(
@@ -472,7 +477,8 @@ delete_and_terminate(State) ->
IndexState2;
{DeltaSeqId, NextSeqId, IndexState2} ->
{_DeleteCount, IndexState3} =
- delete1(NextSeqId, 0, DeltaSeqId, IndexState2),
+ delete1(PersistentStore, NextSeqId, 0, DeltaSeqId,
+ IndexState2),
IndexState3
end,
IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1),
@@ -490,64 +496,59 @@ delete_and_terminate(State) ->
%% msg_store:release so that the cache isn't held full of msgs which
%% are now at the tail of the queue.
requeue(MsgsWithAckTags, State) ->
- {SeqIds, MsgIdsPersistent, MsgIdsTransient,
+ {SeqIds, MsgIdsByStore,
State1 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { guid = MsgId }, AckTag},
- {SeqIdsAcc, MsgIdsP, MsgIdsT, StateN}) ->
- {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, MsgOnDisk} =
+ {SeqIdsAcc, Dict, StateN}) ->
+ {SeqIdsAcc1, Dict1, MsgOnDisk} =
case AckTag of
ack_not_on_disk ->
- {SeqIdsAcc, MsgIdsP, MsgIdsT, false};
- {ack_index_and_store, MsgId, SeqId, true} ->
- {[SeqId | SeqIdsAcc], [MsgId | MsgIdsP], MsgIdsT, true};
- {ack_index_and_store, MsgId, SeqId, false} ->
- {[SeqId | SeqIdsAcc], MsgIdsP, [MsgId | MsgIdsT], true}
+ {SeqIdsAcc, Dict, false};
+ {ack_index_and_store, MsgId, SeqId, MsgStore} ->
+ {[SeqId | SeqIdsAcc],
+ rabbit_misc:dict_cons(MsgStore, MsgId, Dict),
+ true}
end,
{_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN),
- {SeqIdsAcc1, MsgIdsP1, MsgIdsT1, StateN1}
- end, {[], [], [], State}, MsgsWithAckTags),
+ {SeqIdsAcc1, Dict1, StateN1}
+ end, {[], dict:new(), State}, MsgsWithAckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = case MsgIdsPersistent of
- [] -> ok;
- _ -> rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
- end,
- ok = case MsgIdsTransient of
- [] -> ok;
- _ -> rabbit_msg_store:release(?TRANSIENT_MSG_STORE, MsgIdsTransient)
- end,
+ ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
+ rabbit_msg_store:release(MsgStore, MsgIds)
+ end, ok, MsgIdsByStore),
State1 #vqstate { index_state = IndexState1 }.
tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
- State = #vqstate { msg_store_clients = MSCState }) ->
+ State = #vqstate { msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
{#msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
State #vqstate { msg_store_clients = MSCState1 };
tx_publish(_Msg, State) ->
State.
-tx_rollback(Pubs, State) ->
+tx_rollback(Pubs, State = #vqstate { persistent_store = PersistentStore }) ->
ok = case persistent_msg_ids(Pubs) of
[] -> ok;
- PP -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, PP)
+ PP -> rabbit_msg_store:remove(PersistentStore, PP)
end,
State.
-tx_commit(Pubs, AckTags, From, State) ->
+tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) ->
case persistent_msg_ids(Pubs) of
[] ->
{true, tx_commit_from_msg_store(Pubs, AckTags, From, State)};
PersistentMsgIds ->
Self = self(),
ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE,
- PersistentMsgIds,
+ PersistentStore, PersistentMsgIds,
fun () -> ok = rabbit_amqqueue:tx_commit_msg_store_callback(
Self, Pubs, AckTags, From)
end),
@@ -696,51 +697,53 @@ should_force_index_to_disk(State =
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-delete1(NextSeqId, Count, DeltaSeqId, IndexState)
+delete1(_PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState)
when DeltaSeqId >= NextSeqId ->
{Count, IndexState};
-delete1(NextSeqId, Count, DeltaSeqId, IndexState) ->
+delete1(PersistentStore, NextSeqId, Count, DeltaSeqId, IndexState) ->
Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(),
case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of
{[], IndexState1} ->
- delete1(NextSeqId, Count, Delta1SeqId, IndexState1);
+ delete1(PersistentStore, NextSeqId, Count, Delta1SeqId,
+ IndexState1);
{List, IndexState1} ->
Q = betas_from_segment_entries(List, Delta1SeqId),
{QCount, IndexState2} =
- remove_queue_entries(fun beta_fold_no_index_on_disk/3,
- Q, IndexState1),
- delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2)
+ remove_queue_entries(
+ PersistentStore, fun beta_fold_no_index_on_disk/3,
+ Q, IndexState1),
+ delete1(PersistentStore, NextSeqId, Count + QCount, Delta1SeqId,
+ IndexState2)
end.
-purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
+purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
+ persistent_store = PersistentStore }) ->
case bpqueue:is_empty(Q3) of
true ->
{Q1Count, IndexState1} =
- remove_queue_entries(fun rabbit_misc:queue_fold/3,
- State #vqstate.q1, IndexState),
+ remove_queue_entries(
+ PersistentStore, fun rabbit_misc:queue_fold/3,
+ State #vqstate.q1, IndexState),
{Count + Q1Count, State #vqstate { q1 = queue:new(),
index_state = IndexState1 }};
false ->
{Q3Count, IndexState1} =
- remove_queue_entries(fun beta_fold_no_index_on_disk/3,
- Q3, IndexState),
+ remove_queue_entries(
+ PersistentStore, fun beta_fold_no_index_on_disk/3,
+ Q3, IndexState),
purge1(Count + Q3Count,
maybe_deltas_to_betas(
State #vqstate { index_state = IndexState1,
q3 = bpqueue:new() }))
end.
-remove_queue_entries(Fold, Q, IndexState) ->
- {Count, MsgIdsPersistent, MsgIdsTransient, SeqIds, IndexState1} =
- Fold(fun remove_queue_entries1/2, {0, [], [], [], IndexState}, Q),
- ok = case MsgIdsPersistent of
- [] -> ok;
- _ -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIdsPersistent)
- end,
- ok = case MsgIdsTransient of
- [] -> ok;
- _ -> rabbit_msg_store:remove(?TRANSIENT_MSG_STORE, MsgIdsTransient)
- end,
+remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
+ {_PersistentStore, Count, MsgIdsByStore, SeqIds, IndexState1} =
+ Fold(fun remove_queue_entries1/2,
+ {PersistentStore, 0, dict:new(), [], IndexState}, Q),
+ ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
+ rabbit_msg_store:remove(MsgStore, MsgIds)
+ end, ok, MsgIdsByStore),
IndexState2 =
case SeqIds of
[] -> IndexState1;
@@ -752,12 +755,15 @@ remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {CountN, MsgIdsP, MsgIdsT, SeqIdsAcc, IndexStateN}) ->
- {MsgIdsP1, MsgIdsT1} =
+ {PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) ->
+ MsgIdsByStore1 =
case {MsgOnDisk, IsPersistent} of
- {true, true} -> {[MsgId | MsgIdsP], MsgIdsT};
- {true, false} -> {MsgIdsP, [MsgId | MsgIdsT]};
- {false, _} -> {MsgIdsP, MsgIdsT}
+ {true, true} ->
+ rabbit_misc:dict_cons(PersistentStore, MsgId, MsgIdsByStore);
+ {true, false} ->
+ rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, MsgId, MsgIdsByStore);
+ {false, _} ->
+ MsgIdsByStore
end,
SeqIdsAcc1 = case IndexOnDisk of
true -> [SeqId | SeqIdsAcc];
@@ -768,13 +774,14 @@ remove_queue_entries1(
SeqId, IndexStateN);
false -> IndexStateN
end,
- {CountN + 1, MsgIdsP1, MsgIdsT1, SeqIdsAcc1, IndexStateN1}.
+ {PersistentStore, CountN + 1, MsgIdsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
- msg_store_clients = MSCState }) ->
+ msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
0 = DeltaCount, %% ASSERTION
@@ -786,7 +793,8 @@ fetch_from_q3_or_delta(State = #vqstate {
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
guid = MsgId }}, MSCState1} =
- read_from_msg_store(MSCState, MsgId, IsPersistent),
+ read_from_msg_store(
+ PersistentStore, MSCState, IsPersistent, MsgId),
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = case IndexOnDisk of
true -> RamIndexCount;
@@ -881,11 +889,12 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
State #vqstate { next_seq_id = SeqId + 1, len = Len + 1,
in_counter = InCount + 1 })}.
-publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState }) ->
+publish(msg, MsgStatus, #vqstate {
+ index_state = IndexState, ram_msg_count = RamMsgCount,
+ msg_store_clients = MSCState,
+ persistent_store = PersistentStore } = State) ->
{MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
@@ -893,11 +902,12 @@ publish(msg, MsgStatus, State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState1 },
store_alpha_entry(MsgStatus2, State1);
-publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1,
- ram_index_count = RamIndexCount,
- msg_store_clients = MSCState }) ->
+publish(index, MsgStatus, #vqstate {
+ index_state = IndexState, q1 = Q1,
+ ram_index_count = RamIndexCount, msg_store_clients = MSCState,
+ persistent_store = PersistentStore } = State) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
@@ -913,9 +923,10 @@ publish(index, MsgStatus, State = #vqstate { index_state = IndexState, q1 = Q1,
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
#vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta, msg_store_clients = MSCState }) ->
+ delta = Delta, msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(true, MsgStatus, MSCState),
+ maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(true, MsgStatus1, IndexState),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
@@ -955,41 +966,42 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
end.
-find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
-find_msg_store(false) -> ?TRANSIENT_MSG_STORE.
-
-read_from_msg_store({MSCStateP, MSCStateT}, MsgId, true) ->
- {Res, MSCStateP1} =
- rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateP),
- {Res, {MSCStateP1, MSCStateT}};
-read_from_msg_store({MSCStateP, MSCStateT}, MsgId, false) ->
- {Res, MSCStateT1} =
- rabbit_msg_store:read(?TRANSIENT_MSG_STORE, MsgId, MSCStateT),
- {Res, {MSCStateP, MSCStateT1}}.
-
-maybe_write_msg_to_disk(_Force, MsgStatus =
+find_msg_store(true, PersistentStore) -> PersistentStore;
+find_msg_store(false, _PersistentStore) -> ?TRANSIENT_MSG_STORE.
+
+with_msg_store_state(PersistentStore, {MSCStateP, MSCStateT}, true,
+ Fun) ->
+ {Result, MSCStateP1} = Fun(PersistentStore, MSCStateP),
+ {Result, {MSCStateP1, MSCStateT}};
+with_msg_store_state(_PersistentStore, {MSCStateP, MSCStateT}, false,
+ Fun) ->
+ {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
+ {Result, {MSCStateP, MSCStateT1}}.
+
+read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) ->
+ with_msg_store_state(
+ PersistentStore, MSCState, IsPersistent,
+ fun (MsgStore, MSCState1) ->
+ rabbit_msg_store:read(MsgStore, MsgId, MSCState1)
+ end).
+
+maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
#msg_status { msg_on_disk = true }, MSCState) ->
{MsgStatus, MSCState};
-maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent },
- {MSCStateP, MSCStateT})
+maybe_write_msg_to_disk(PersistentStore, Force,
+ MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId,
+ is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
- MSCState1 =
- case IsPersistent of
- true ->
- {ok, MSCStateP1} = rabbit_msg_store:write(
- ?PERSISTENT_MSG_STORE, MsgId,
- ensure_binary_properties(Msg), MSCStateP),
- {MSCStateP1, MSCStateT};
- false ->
- {ok, MSCStateT1} = rabbit_msg_store:write(
- ?TRANSIENT_MSG_STORE, MsgId,
- ensure_binary_properties(Msg), MSCStateT),
- {MSCStateP, MSCStateT1}
- end,
+ {ok, MSCState1} =
+ with_msg_store_state(
+ PersistentStore, MSCState, IsPersistent,
+ fun (MsgStore, MSCState2) ->
+ rabbit_msg_store:write(
+ MsgStore, MsgId, ensure_binary_properties(Msg), MSCState2)
+ end),
{MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
-maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) ->
+maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) ->
{MsgStatus, MSCState}.
maybe_write_index_to_disk(_Force, MsgStatus =
@@ -1137,12 +1149,14 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
maybe_push_alphas_to_betas(
Generator, Consumer, Q, State =
#vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- index_state = IndexState, msg_store_clients = MSCState }) ->
+ index_state = IndexState, msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
- {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(true, MsgStatus,
- MSCState),
+ {MsgStatus1, MSCState1} =
+ maybe_write_msg_to_disk(
+ PersistentStore, true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
{MsgStatus2, IndexState1} =
maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),