diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 09:44:49 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-17 09:44:49 +0100 |
| commit | ad27f6e8847fb7c01a9c68dd7d6faf220e378c3d (patch) | |
| tree | 05db6577b7095f8fd737d1cc7013c796e98906a5 | |
| parent | 7f0bc2cddfcd046fecde2f93a5f1ddd4ab348ae3 (diff) | |
| download | rabbitmq-server-git-ad27f6e8847fb7c01a9c68dd7d6faf220e378c3d.tar.gz | |
first stab at keeping track of when messages are synced to disk
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 93 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 23 |
3 files changed, 92 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cdc8cdd2c5..a25ad48b3b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -430,7 +430,6 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - rabbit_log:info("No consumers on unblocked channels; enqueueing...~n"), BQS = BQ:publish(Message, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 631005711b..09eb7cfda2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -35,7 +35,8 @@ -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/2, client_terminate/1, - client_delete_and_terminate/3, successfully_recovered_state/1]). + client_delete_and_terminate/3, successfully_recovered_state/1, + register_callback/2]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -82,7 +83,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + pid_to_fun, %% pid to callback function mapping + pid_to_guids %% pid to synced messages mapping }). -record(client_msstate, @@ -318,7 +321,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, self(), Guid, Msg}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -385,6 +388,9 @@ client_delete_and_terminate(CState, Server, Ref) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). +register_callback(Server, Fun) -> + gen_server2:call(Server, {register_callback, Fun}, infinity). + %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -539,7 +545,9 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = AllCleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + pid_to_fun = dict:new(), + pid_to_guids = dict:new() }, ok = case AllCleanShutdown of @@ -601,21 +609,32 @@ handle_call(successfully_recovered_state, _From, State) -> handle_call({delete_client, CRef}, _From, State = #msstate { client_refs = ClientRefs }) -> reply(ok, - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); + +handle_call({register_callback, Fun}, {Pid, _}, + State = #msstate { pid_to_fun = PTF }) -> + rabbit_log:info("Registering callback ~p~n", [{Pid, Fun}]), + erlang:monitor(process, Pid), + reply(ok, + State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }). -handle_cast({write, Guid, Msg}, + +handle_cast({write, Pid, Guid, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, sum_file_size = SumFileSize, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + cur_file_cache_ets = CurFileCacheEts, + pid_to_fun = PTF, + pid_to_guids = PTG}) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), case index_lookup(Guid, State) of not_found -> %% New message, lots to do {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + rabbit_log:info("message ~p written to disk~n", [Guid]), ok = index_insert(#msg_location { guid = Guid, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, @@ -641,7 +660,11 @@ handle_cast({write, Guid, Msg}, maybe_roll_to_new_file( NextOffset, State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + sum_file_size = SumFileSize + TotalSize, + pid_to_guids = case dict:find(Pid, PTF) of + {ok, _} -> dict:append(Pid, Guid, PTG); + error -> PTG + end})); #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC @@ -720,6 +743,14 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_info(timeout, State) -> noreply(internal_sync(State)); +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #msstate { pid_to_fun = PTF, + pid_to_guids = PTG }) -> + % A queue with a callback has died, so remove it from dicts. + rabbit_log:info("Queue ~p has gone down~n", [Pid]), + {noreply, State #msstate { pid_to_fun = dict:erase(Pid, PTF), + pid_to_guids = dict:erase(Pid, PTG) }}; + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -767,14 +798,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = OS, + pid_to_guids = PTG }) -> + case {OS, dict:size(PTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = OS, + pid_to_guids = PTG }) -> + case {OS, dict:size(PTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -787,14 +823,25 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> State #msstate { sync_timer_ref = undefined }. internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> + on_sync = Syncs, + pid_to_fun = PTF, + pid_to_guids = PTG }) -> + rabbit_log:info("msg_store syncing~ncallbacks: ~p~nguids: ~p~n", [dict:to_list(PTF), dict:to_list(PTG)]), State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + State2 = case Syncs of + [] -> State1; + _ -> ok = file_handle_cache:sync(CurHdl), + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { on_sync = [] } + end, + dict:map(fun(Pid, Guids) -> + case dict:find(Pid, PTF) of + {ok, Fun} -> Fun(Guids); + error -> ok %% shouldn't happen + end + end, PTG), + State2 #msstate { pid_to_guids = dict:new() }. + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 714eb206aa..ca459665d4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -398,6 +398,7 @@ init(QueueName, IsDurable, Recover) -> true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); false -> undefined end, + register_puback_callback(?PERSISTENT_MSG_STORE), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -1004,7 +1005,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount }) -> - rabbit_log:info("message ~p got to variable_queue:publish~n", [Msg#basic_message.guid]), IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, @@ -1069,6 +1069,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, msg_store_clients = MSCState1 }}. %%---------------------------------------------------------------------------- +%% Internal gubbins for pubacks +%%---------------------------------------------------------------------------- + +register_puback_callback(MessageStore) -> + rabbit_msg_store:register_callback( + MessageStore, + fun (Guids) -> + spawn(fun () -> ok = rabbit_misc:with_exit_handler( + fun () -> + rabbit_log:info("something bad happened while sending pubacks back to channel"), + ok + end, + fun () -> + lists:foreach(fun(G) -> + rabbit_log:info("send puback back to channel for ~p~n", [G]) end, Guids), + ok + end) + end) + end). + +%%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- |
