summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-10-21 15:59:41 +0100
committerMatthias Radestock <matthias@lshift.net>2008-10-21 15:59:41 +0100
commit25a63c3e348582e1b407f7961288690553bad166 (patch)
tree7f0aa853e4c1760b6feac01a42cc8083b80e31d6
parentbcd4affd32ff8dfc4351c1574420d6a615984ada (diff)
downloadrabbitmq-server-git-25a63c3e348582e1b407f7961288690553bad166.tar.gz
hibernate some processes to conserve memory
In my experiments I encountered situations where rabbit would not recover from a high memory alert even though all messages had been drained from it. By inspecting the running processes I determined that queue and channel processes sometimes hung on to garbage. Erlang's gc is per-process and triggered by process reduction counts, which means an idle process will never perform a gc. This explains the behaviour - the publisher channel goes idle when channel flow control is activated and the queue process goes idle once all messages have been drained from it. Hibernating idle processes forces a gc, as well as generally reducing memory consumption. Currently only channel and queue processes are hibernating, since these are the only two that seemed to be causing problems in my tests. We may want to extend hibernation to other processes in the future.
-rw-r--r--src/buffering_proxy.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl78
2 files changed, 49 insertions, 34 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
index dc1686081f..7707e63662 100644
--- a/src/buffering_proxy.erl
+++ b/src/buffering_proxy.erl
@@ -32,6 +32,8 @@
-export([mainloop/4, drain/2]).
-export([proxy_loop/3]).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
start_link(M, A) ->
@@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) ->
ProxyPid ! Ref,
NewSt;
Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
end,
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef1646..e687df846a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER, 1000).
-export([start_link/1]).
@@ -75,7 +76,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
@@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
end,
round_robin = NewActive})) of
{continue, NewState} ->
- {noreply, NewState};
+ noreply(NewState);
{stop, NewState} ->
{stop, normal, NewState}
end
@@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% queues discarding the message?
%%
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
@@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
gen_server:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
- {noreply, NewState};
+ noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
persist_auto_ack(QName, Message)
end,
Msg = {QName, self(), NextId, Delivered, Message},
- {reply, {ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
+ reply({ok, queue:len(BufferTail), Msg},
+ State#q{message_buffer = BufferTail,
+ next_msg_id = NextId + 1});
{empty, _} ->
- {reply, empty, State}
+ reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
@@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
- {reply, {error, queue_owned_by_another_connection}, State};
+ reply({error, queue_owned_by_another_connection}, State);
ok ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
- {reply, {error, exclusive_consume_unavailable}, State};
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
@@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, run_poke_burst(State1)}
+ reply(ok, run_poke_burst(State1))
end
end;
@@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, State};
+ reply(ok, State);
C = #cr{consumers = Consumers} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
@@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ConsumerTag,
RoundRobin)}) of
{continue, State1} ->
- {reply, ok, State1};
+ reply(ok, State1);
{stop, State1} ->
{stop, normal, ok, State1}
end
@@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
- {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
- {reply, {error, not_empty}, State};
+ reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
- {reply, {error, in_use}, State};
+ reply({error, in_use}, State);
true ->
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
end;
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
- {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
+ reply({ok, queue:len(MessageBuffer)},
+ State#q{message_buffer = queue:new()});
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% to check, we'd need to hold not just the ch
%% pid for each consumer, but also its reader
%% pid...
- {reply, locked, State};
+ reply(locked, State);
ok ->
- {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
+ reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
end;
{ReaderPid, _MonitorRef} ->
- {reply, ok, State};
+ reply(ok, State);
_ ->
- {reply, locked, State}
+ reply(locked, State)
end.
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {noreply, NewState};
+ noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
@@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
_ ->
record_pending_acks(Txn, ChPid, MsgIds)
end,
- {noreply, State}
+ noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
- {noreply, State};
+ noreply(State);
handle_cast({redeliver, Messages}, State) ->
- {noreply, deliver_or_enqueue_n(Messages, State)};
+ noreply(deliver_or_enqueue_n(Messages, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {noreply, deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State)}
+ noreply(deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
end;
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
T = #cr{unsent_message_count =Count} ->
- {noreply, possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State)}
+ noreply(possibly_unblock(
+ T#cr{unsent_message_count = Count - 1},
+ State))
end.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
@@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State) ->
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.