summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl13
-rw-r--r--src/rabbit_tests.erl8
-rw-r--r--src/rabbit_variable_queue.erl62
3 files changed, 46 insertions, 37 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 84853227d8..7249e13ea6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,9 +34,8 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/2,
- client_delete_and_terminate/3, successfully_recovered_state/1,
- register_sync_callback/3]).
+ sync/3, client_init/3, client_terminate/2,
+ client_delete_and_terminate/3, successfully_recovered_state/1]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -124,6 +123,7 @@
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
+-type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
@@ -140,7 +140,7 @@
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()).
+-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> client_msstate()).
-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
-spec(client_delete_and_terminate/3 ::
(client_msstate(), server(), rabbit_guid:guid()) -> 'ok').
@@ -360,10 +360,11 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
-client_init(Server, Ref) ->
+client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ register_sync_callback(Server, Ref, MsgOnDiskFun),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -629,6 +630,8 @@ handle_call({new_client_state, CRef}, _From,
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
+handle_call({register_sync_callback, _ , undefined}, _From, State) ->
+ reply(ok, State);
handle_call({register_sync_callback, ClientRef, Fun}, _From,
State = #msstate { client_ondisk_callback = CODC }) ->
reply(ok, State #msstate { client_ondisk_callback =
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b814390048..ddb78aff12 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1469,7 +1469,7 @@ msg_store_remove(Guids) ->
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore).
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined), L), MsgStore).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1479,7 +1479,7 @@ test_msg_store() ->
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids),
Ref = rabbit_guid:guid(),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined),
%% publish the first half
{ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
%% sync on the first half
@@ -1553,7 +1553,7 @@ test_msg_store() ->
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids),
%% publish the first half again
- MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined),
{ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
@@ -1645,7 +1645,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
- end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
+ end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref, undefined)}, SeqIds),
ok = rabbit_msg_store:client_delete_and_terminate(
MSCStateEnd, MsgStore, Ref),
{A, B}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e6a9387106..31fb21f6ef 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -377,10 +377,8 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) ->
- msgs_written_to_disk(Self, Guids)
- end,
- fun msg_indices_written_to_disk/1).
+ fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, Recover,
MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -409,16 +407,16 @@ init(QueueName, IsDurable, Recover,
end_seq_id = NextSeqId }
end,
Now = now(),
+
PersistentClient =
case IsDurable of
- true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
+ true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun);
false -> undefined
end,
+ TransientClient =
+ rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef, undefined),
- rabbit_msg_store:register_sync_callback(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun),
-
- TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
q2 = bpqueue:new(),
@@ -528,24 +526,25 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
pending_ack = PA,
- durable = IsDurable }) ->
+ durable = IsDurable,
+ need_confirming = NeedConfirming }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
+ NeedConfirming1 = case NeedsConfirming of
+ true -> gb_sets:add(Guid, NeedConfirming);
+ false -> NeedConfirming
+ end,
{SeqId, a(State1 #vqstate {
next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
pending_ack = PA1,
- need_confirming =
- case NeedsConfirming of
- true -> gb_sets:insert(Guid, State1#vqstate.need_confirming);
- false -> State1#vqstate.need_confirming
- end })}.
+ need_confirming = NeedConfirming1 })}.
fetch(AckRequired, State = #vqstate { q4 = Q4,
ram_msg_count = RamMsgCount,
@@ -1044,7 +1043,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount }) ->
+ ram_msg_count = RamMsgCount,
+ need_confirming = NeedConfirming}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
@@ -1054,17 +1054,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent,
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
+ NeedConfirming1 = case NeedsConfirming of
+ true -> gb_sets:add(Guid, NeedConfirming);
+ false -> NeedConfirming
+ end,
{SeqId, State2 #vqstate {
next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- need_confirming =
- case NeedsConfirming of
- true -> gb_sets:add(Guid, State2#vqstate.need_confirming);
- false -> State2#vqstate.need_confirming
- end }}.
+ need_confirming = NeedConfirming1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->
@@ -1202,29 +1202,35 @@ msgs_written_to_disk(QPid, Guids) ->
need_confirming = NC }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD),
- State1 = State #vqstate { msgs_on_disk =
- gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC) },
+ State1 =
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), NC) },
{ msgs_confirmed(ToConfirmMsgs, State1),
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).
-msg_indices_written_to_disk(Guids) ->
- Self = self(),
+msg_indices_written_to_disk(QPid, Guids) ->
spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self,
+ QPid,
fun(State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
need_confirming = NC }) ->
GuidSet = gb_sets:from_list(Guids),
ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD),
- State1 = State #vqstate { msg_indices_on_disk =
- gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC) },
+ State1 =
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), NC) },
{ msgs_confirmed(ToConfirmMsgs, State1),
{confirm, gb_sets:to_list(ToConfirmMsgs)} }
end)
end).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------