diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 62 |
3 files changed, 46 insertions, 37 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 84853227d8..7249e13ea6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,9 +34,8 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/2, - client_delete_and_terminate/3, successfully_recovered_state/1, - register_sync_callback/3]). + sync/3, client_init/3, client_terminate/2, + client_delete_and_terminate/3, successfully_recovered_state/1]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -124,6 +123,7 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', @@ -140,7 +140,7 @@ -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), rabbit_guid:guid()) -> client_msstate()). +-spec(client_init/3 :: (server(), rabbit_guid:guid(), guid_fun()) -> client_msstate()). -spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). -spec(client_delete_and_terminate/3 :: (client_msstate(), server(), rabbit_guid:guid()) -> 'ok'). @@ -360,10 +360,11 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), + register_sync_callback(Server, Ref, MsgOnDiskFun), #client_msstate { file_handle_cache = dict:new(), index_state = IState, index_module = IModule, @@ -629,6 +630,8 @@ handle_call({new_client_state, CRef}, _From, handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); +handle_call({register_sync_callback, _ , undefined}, _From, State) -> + reply(ok, State); handle_call({register_sync_callback, ClientRef, Fun}, _From, State = #msstate { client_ondisk_callback = CODC }) -> reply(ok, State #msstate { client_ondisk_callback = diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b814390048..ddb78aff12 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1469,7 +1469,7 @@ msg_store_remove(Guids) -> foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L), MsgStore). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L), MsgStore). test_msg_store() -> restart_msg_store_empty(), @@ -1479,7 +1479,7 @@ test_msg_store() -> %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), %% publish the first half {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half @@ -1553,7 +1553,7 @@ test_msg_store() -> %% check we don't contain any of the msgs false = msg_store_contains(false, Guids), %% publish the first half again - MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, undefined), {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( @@ -1645,7 +1645,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} - end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), + end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref, undefined)}, SeqIds), ok = rabbit_msg_store:client_delete_and_terminate( MSCStateEnd, MsgStore, Ref), {A, B}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e6a9387106..31fb21f6ef 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -377,10 +377,8 @@ stop_msg_store() -> init(QueueName, IsDurable, Recover) -> Self = self(), init(QueueName, IsDurable, Recover, - fun (Guids) -> - msgs_written_to_disk(Self, Guids) - end, - fun msg_indices_written_to_disk/1). + fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). init(QueueName, IsDurable, Recover, MsgOnDiskFun, MsgIdxOnDiskFun) -> @@ -409,16 +407,16 @@ init(QueueName, IsDurable, Recover, end_seq_id = NextSeqId } end, Now = now(), + PersistentClient = case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); + true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef, + MsgOnDiskFun); false -> undefined end, + TransientClient = + rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef, undefined), - rabbit_msg_store:register_sync_callback(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun), - - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), q2 = bpqueue:new(), @@ -528,24 +526,25 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, pending_ack = PA, - durable = IsDurable }) -> + durable = IsDurable, + need_confirming = NeedConfirming }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), + NeedConfirming1 = case NeedsConfirming of + true -> gb_sets:add(Guid, NeedConfirming); + false -> NeedConfirming + end, {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1, - need_confirming = - case NeedsConfirming of - true -> gb_sets:insert(Guid, State1#vqstate.need_confirming); - false -> State1#vqstate.need_confirming - end })}. + need_confirming = NeedConfirming1 })}. fetch(AckRequired, State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, @@ -1044,7 +1043,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + need_confirming = NeedConfirming}) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, @@ -1054,17 +1054,17 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), + NeedConfirming1 = case NeedsConfirming of + true -> gb_sets:add(Guid, NeedConfirming); + false -> NeedConfirming + end, {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, ram_msg_count = RamMsgCount + 1, - need_confirming = - case NeedsConfirming of - true -> gb_sets:add(Guid, State2#vqstate.need_confirming); - false -> State2#vqstate.need_confirming - end }}. + need_confirming = NeedConfirming1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1202,29 +1202,35 @@ msgs_written_to_disk(QPid, Guids) -> need_confirming = NC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), - State1 = State #vqstate { msgs_on_disk = - gb_sets:intersection(gb_sets:union(MOD, GuidSet), NC) }, + State1 = + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), NC) }, { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). -msg_indices_written_to_disk(Guids) -> - Self = self(), +msg_indices_written_to_disk(QPid, Guids) -> spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, + QPid, fun(State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, need_confirming = NC }) -> GuidSet = gb_sets:from_list(Guids), ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), - State1 = State #vqstate { msg_indices_on_disk = - gb_sets:intersection(gb_sets:union(MIOD, GuidSet), NC) }, + State1 = + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), NC) }, { msgs_confirmed(ToConfirmMsgs, State1), {confirm, gb_sets:to_list(ToConfirmMsgs)} } end) end). + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- |
