summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_msg_store.erl93
-rw-r--r--src/rabbit_variable_queue.erl23
3 files changed, 92 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cdc8cdd2c5..a25ad48b3b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -430,7 +430,6 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- rabbit_log:info("No consumers on unblocked channels; enqueueing...~n"),
BQS = BQ:publish(Message, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 631005711b..09eb7cfda2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -35,7 +35,8 @@
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
sync/3, client_init/2, client_terminate/1,
- client_delete_and_terminate/3, successfully_recovered_state/1]).
+ client_delete_and_terminate/3, successfully_recovered_state/1,
+ register_callback/2]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -82,7 +83,9 @@
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
- file_size_limit %% how big are our files allowed to get?
+ file_size_limit, %% how big are our files allowed to get?
+ pid_to_fun, %% pid to callback function mapping
+ pid_to_guids %% pid to synced messages mapping
}).
-record(client_msstate,
@@ -318,7 +321,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid, Msg}), CState}.
+ {gen_server2:cast(Server, {write, self(), Guid, Msg}), CState}.
read(Server, Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -385,6 +388,9 @@ client_delete_and_terminate(CState, Server, Ref) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
+register_callback(Server, Fun) ->
+ gen_server2:call(Server, {register_callback, Fun}, infinity).
+
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
@@ -539,7 +545,9 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
successfully_recovered = AllCleanShutdown,
- file_size_limit = FileSizeLimit
+ file_size_limit = FileSizeLimit,
+ pid_to_fun = dict:new(),
+ pid_to_guids = dict:new()
},
ok = case AllCleanShutdown of
@@ -601,21 +609,32 @@ handle_call(successfully_recovered_state, _From, State) ->
handle_call({delete_client, CRef}, _From,
State = #msstate { client_refs = ClientRefs }) ->
reply(ok,
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
+
+handle_call({register_callback, Fun}, {Pid, _},
+ State = #msstate { pid_to_fun = PTF }) ->
+ rabbit_log:info("Registering callback ~p~n", [{Pid, Fun}]),
+ erlang:monitor(process, Pid),
+ reply(ok,
+ State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }).
-handle_cast({write, Guid, Msg},
+
+handle_cast({write, Pid, Guid, Msg},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ cur_file_cache_ets = CurFileCacheEts,
+ pid_to_fun = PTF,
+ pid_to_guids = PTG}) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
case index_lookup(Guid, State) of
not_found ->
%% New message, lots to do
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ rabbit_log:info("message ~p written to disk~n", [Guid]),
ok = index_insert(#msg_location {
guid = Guid, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize },
@@ -641,7 +660,11 @@ handle_cast({write, Guid, Msg},
maybe_roll_to_new_file(
NextOffset, State #msstate {
sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
+ sum_file_size = SumFileSize + TotalSize,
+ pid_to_guids = case dict:find(Pid, PTF) of
+ {ok, _} -> dict:append(Pid, Guid, PTG);
+ error -> PTG
+ end}));
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
@@ -720,6 +743,14 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_info(timeout, State) ->
noreply(internal_sync(State));
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #msstate { pid_to_fun = PTF,
+ pid_to_guids = PTG }) ->
+ % A queue with a callback has died, so remove it from dicts.
+ rabbit_log:info("Queue ~p has gone down~n", [Pid]),
+ {noreply, State #msstate { pid_to_fun = dict:erase(Pid, PTF),
+ pid_to_guids = dict:erase(Pid, PTG) }};
+
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -767,14 +798,19 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #msstate { sync_timer_ref = undefined }) ->
- {start_sync_timer(State), 0};
-next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = OS,
+ pid_to_guids = PTG }) ->
+ case {OS, dict:size(PTG)} of
+ {[], 0} -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
+ end;
+next_state(State = #msstate { on_sync = OS,
+ pid_to_guids = PTG }) ->
+ case {OS, dict:size(PTG)} of
+ {[], 0} -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
+ end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
@@ -787,14 +823,25 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
State #msstate { sync_timer_ref = undefined }.
internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs }) ->
+ on_sync = Syncs,
+ pid_to_fun = PTF,
+ pid_to_guids = PTG }) ->
+ rabbit_log:info("msg_store syncing~ncallbacks: ~p~nguids: ~p~n", [dict:to_list(PTF), dict:to_list(PTG)]),
State1 = stop_sync_timer(State),
- case Syncs of
- [] -> State1;
- _ -> ok = file_handle_cache:sync(CurHdl),
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { on_sync = [] }
- end.
+ State2 = case Syncs of
+ [] -> State1;
+ _ -> ok = file_handle_cache:sync(CurHdl),
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ State1 #msstate { on_sync = [] }
+ end,
+ dict:map(fun(Pid, Guids) ->
+ case dict:find(Pid, PTF) of
+ {ok, Fun} -> Fun(Guids);
+ error -> ok %% shouldn't happen
+ end
+ end, PTG),
+ State2 #msstate { pid_to_guids = dict:new() }.
+
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 714eb206aa..ca459665d4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -398,6 +398,7 @@ init(QueueName, IsDurable, Recover) ->
true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
false -> undefined
end,
+ register_puback_callback(?PERSISTENT_MSG_STORE),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
@@ -1004,7 +1005,6 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
- rabbit_log:info("message ~p got to variable_queue:publish~n", [Msg#basic_message.guid]),
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
@@ -1069,6 +1069,27 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
msg_store_clients = MSCState1 }}.
%%----------------------------------------------------------------------------
+%% Internal gubbins for pubacks
+%%----------------------------------------------------------------------------
+
+register_puback_callback(MessageStore) ->
+ rabbit_msg_store:register_callback(
+ MessageStore,
+ fun (Guids) ->
+ spawn(fun () -> ok = rabbit_misc:with_exit_handler(
+ fun () ->
+ rabbit_log:info("something bad happened while sending pubacks back to channel"),
+ ok
+ end,
+ fun () ->
+ lists:foreach(fun(G) ->
+ rabbit_log:info("send puback back to channel for ~p~n", [G]) end, Guids),
+ ok
+ end)
+ end)
+ end).
+
+%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------