summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-10-22 07:51:58 +0100
committerMatthias Radestock <matthias@lshift.net>2008-10-22 07:51:58 +0100
commitb19e4bb8004d35efc9bbc43bdb9c20c111e6a3e2 (patch)
tree7f0aa853e4c1760b6feac01a42cc8083b80e31d6
parentd2f22dbbfa3f592f7f6d1fb706506b3f2de322c7 (diff)
parent25a63c3e348582e1b407f7961288690553bad166 (diff)
downloadrabbitmq-server-git-b19e4bb8004d35efc9bbc43bdb9c20c111e6a3e2.tar.gz
merge bug19468_channels into bug19468
-rw-r--r--packaging/RPMS/Fedora/Makefile2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec6
-rw-r--r--packaging/debs/Debian/debian/control2
-rwxr-xr-xscripts/rabbitmq-server1
-rw-r--r--scripts/rabbitmq-server.bat1
-rw-r--r--src/buffering_proxy.erl18
-rw-r--r--src/rabbit_amqqueue.erl61
-rw-r--r--src/rabbit_amqqueue_process.erl78
-rw-r--r--src/rabbit_channel.erl73
-rw-r--r--src/rabbit_reader.erl37
10 files changed, 166 insertions, 113 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 6cc3579bab..814c79f03c 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -5,7 +5,7 @@ SOURCE_TARBALL_DIR=../../../dist
TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
TOP_DIR=$(shell pwd)
RPM_VERSION=$(shell echo $(VERSION) | tr - _)
-DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)'
+DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'main_version $(VERSION)' --define 'rpm_version $(RPM_VERSION)' --define 'debian 1'
rpms: clean server
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 08694c096c..43837ba34b 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -6,6 +6,10 @@ Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{main_version}/%{name}-%{main_version}.tar.gz
URL: http://www.rabbitmq.com/
Vendor: LShift Ltd., Cohesive Financial Technologies LLC., Rabbit Technlogies Ltd.
+%if 0%{?debian}
+%else
+BuildRequires: python, python-json
+%endif
Requires: erlang, logrotate
Packager: Hubert Plociniczak <hubert@lshift.net>
BuildRoot: %{_tmppath}/%{name}-%{main_version}-%{release}-root
@@ -18,13 +22,11 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-
%define _mandir /usr/share/man
%define _sbindir /usr/sbin
%define _libdir %(erl -noshell -eval "io:format('~s~n', [code:lib_dir()]), halt().")
%define _maindir %{buildroot}%{_libdir}/rabbitmq_server-%{main_version}
-
%pre
if [ $1 -gt 1 ]; then
#Upgrade - stop and remove previous instance of rabbitmq-server init.d script
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index bc691bc7b6..675e15f490 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: Tony Garnock-Jones <tonyg@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python
+Build-Depends: cdbs, debhelper (>= 5), erlang-base | erlang-base-hipe, erlang-nox, erlang-dev, erlang-src, make, python, python-json
Standards-Version: 3.7.2
Package: rabbitmq-server
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 994736ceb5..c953a75312 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -69,6 +69,7 @@ erl \
-os_mon start_memsup true \
-os_mon start_os_sup false \
-os_mon memsup_system_only true \
+ -os_mon system_memory_high_watermark 0.95 \
-mnesia dir "\"${MNESIA_DIR}\"" \
${CLUSTER_CONFIG} \
${RABBIT_ARGS} \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 3ad093e0fc..38b8cc5307 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -110,6 +110,7 @@ set MNESIA_DIR=%MNESIA_BASE%/%NODENAME%-mnesia
-os_mon start_memsup true ^
-os_mon start_os_sup false ^
-os_mon memsup_system_only true ^
+-os_mon system_memory_high_watermark 0.95 ^
-mnesia dir \""%MNESIA_DIR%"\" ^
%CLUSTER_CONFIG% ^
%RABBIT_ARGS% ^
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
index d250570198..7707e63662 100644
--- a/src/buffering_proxy.erl
+++ b/src/buffering_proxy.erl
@@ -32,6 +32,8 @@
-export([mainloop/4, drain/2]).
-export([proxy_loop/3]).
+-define(HIBERNATE_AFTER, 5000).
+
%%----------------------------------------------------------------------------
start_link(M, A) ->
@@ -40,7 +42,8 @@ start_link(M, A) ->
ProxyPid = self(),
Ref = make_ref(),
Pid = spawn_link(
- fun () -> mainloop(ProxyPid, Ref, M,
+ fun () -> ProxyPid ! Ref,
+ mainloop(ProxyPid, Ref, M,
M:init(ProxyPid, A)) end),
proxy_loop(Ref, Pid, empty)
end).
@@ -48,14 +51,19 @@ start_link(M, A) ->
%%----------------------------------------------------------------------------
mainloop(ProxyPid, Ref, M, State) ->
- ProxyPid ! Ref,
NewState =
receive
{Ref, Messages} ->
- lists:foldl(fun (Msg, S) ->
- drain(M, M:handle_message(Msg, S))
- end, State, lists:reverse(Messages));
+ NewSt =
+ lists:foldl(fun (Msg, S) ->
+ drain(M, M:handle_message(Msg, S))
+ end, State, lists:reverse(Messages)),
+ ProxyPid ! Ref,
+ NewSt;
Msg -> M:handle_message(Msg, State)
+ after ?HIBERNATE_AFTER ->
+ erlang:hibernate(?MODULE, mainloop,
+ [ProxyPid, Ref, M, State])
end,
?MODULE:mainloop(ProxyPid, Ref, M, NewState).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7ce350d86e..bd64f1e48d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,12 +28,12 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
- commit/2, rollback/2]).
+ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_down/2]).
+-export([notify_sent/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
-import(mnesia).
@@ -44,6 +44,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(CALL_TIMEOUT, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -53,6 +55,9 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(bind_res() :: {'ok', non_neg_integer()} |
{'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(ok_or_errors() ::
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -81,9 +86,9 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit/2 :: (pid(), txn()) -> 'ok').
--spec(rollback/2 :: (pid(), txn()) -> 'ok').
--spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
@@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
-commit(QPid, Txn) ->
- gen_server:call(QPid, {commit, Txn}).
-
-rollback(QPid, Txn) ->
- gen_server:cast(QPid, {rollback, Txn}).
-
-notify_down(#amqqueue{ pid = QPid }, ChPid) ->
- gen_server:call(QPid, {notify_down, ChPid}).
+commit_all(QPids, Txn) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ QPids).
+
+rollback_all(QPids, Txn) ->
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ QPids).
+
+notify_down_all(QPids, ChPid) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) ->
+ rabbit_misc:with_exit_handler(
+ %% we don't care if the queue process has terminated
+ %% in the meantime
+ fun () -> ok end,
+ fun () -> gen_server:call(QPid, {notify_down, ChPid},
+ Timeout) end)
+ end,
+ QPids).
binding_forcibly_removed(BindingSpec, QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
binding_specs = [],
pid = Pid}.
+
+safe_pmap_ok(F, L) ->
+ case [R || R <- rabbit_misc:upmap(
+ fun (V) ->
+ try F(V)
+ catch Class:Reason -> {Class, Reason}
+ end
+ end, L),
+ R =/= ok] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef1646..e687df846a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -30,6 +30,7 @@
-behaviour(gen_server).
-define(UNSENT_MESSAGE_LIMIT, 100).
+-define(HIBERNATE_AFTER, 1000).
-export([start_link/1]).
@@ -75,7 +76,7 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}}.
+ round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;
@@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) ->
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
round_robin = ActiveConsumers}) ->
case lookup_ch(DownPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} ->
NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
@@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
end,
round_robin = NewActive})) of
{continue, NewState} ->
- {noreply, NewState};
+ noreply(NewState);
{stop, NewState} ->
{stop, normal, NewState}
end
@@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) ->
%% queues discarding the message?
%%
{Delivered, NewState} = attempt_delivery(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({deliver, Txn, Message}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
{Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {reply, Delivered, NewState};
+ reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
@@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) ->
gen_server:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
- {noreply, NewState};
+ noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
@@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From,
persist_auto_ack(QName, Message)
end,
Msg = {QName, self(), NextId, Delivered, Message},
- {reply, {ok, queue:len(BufferTail), Msg},
- State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}};
+ reply({ok, queue:len(BufferTail), Msg},
+ State#q{message_buffer = BufferTail,
+ next_msg_id = NextId + 1});
{empty, _} ->
- {reply, empty, State}
+ reply(empty, State)
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
@@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
round_robin = RoundRobin}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
- {reply, {error, queue_owned_by_another_connection}, State};
+ reply({error, queue_owned_by_another_connection}, State);
ok ->
case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
in_use ->
- {reply, {error, exclusive_consume_unavailable}, State};
+ reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
@@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
end,
round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, run_poke_burst(State1)}
+ reply(ok, run_poke_burst(State1))
end
end;
@@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
- {reply, ok, State};
+ reply(ok, State);
C = #cr{consumers = Consumers} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
@@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ConsumerTag,
RoundRobin)}) of
{continue, State1} ->
- {reply, ok, State1};
+ reply(ok, State1);
{stop, State1} ->
{stop, normal, ok, State1}
end
@@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
round_robin = RoundRobin}) ->
- {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State};
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
@@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
IsUnused = is_unused(),
if
IfEmpty and not(IsEmpty) ->
- {reply, {error, not_empty}, State};
+ reply({error, not_empty}, State);
IfUnused and not(IsUnused) ->
- {reply, {error, in_use}, State};
+ reply({error, in_use}, State);
true ->
{stop, normal, {ok, queue:len(MessageBuffer)}, State}
end;
handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) ->
ok = purge_message_buffer(qname(State), MessageBuffer),
- {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}};
+ reply({ok, queue:len(MessageBuffer)},
+ State#q{message_buffer = queue:new()});
handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
@@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
%% to check, we'd need to hold not just the ch
%% pid for each consumer, but also its reader
%% pid...
- {reply, locked, State};
+ reply(locked, State);
ok ->
- {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}}
+ reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}})
end;
{ReaderPid, _MonitorRef} ->
- {reply, ok, State};
+ reply(ok, State);
_ ->
- {reply, locked, State}
+ reply(locked, State)
end.
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
- {noreply, NewState};
+ noreply(NewState);
handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
@@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
_ ->
record_pending_acks(Txn, ChPid, MsgIds)
end,
- {noreply, State}
+ noreply(State)
end;
handle_cast({rollback, Txn}, State) ->
ok = rollback_work(Txn, qname(State)),
erase_tx(Txn),
- {noreply, State};
+ noreply(State);
handle_cast({redeliver, Messages}, State) ->
- {noreply, deliver_or_enqueue_n(Messages, State)};
+ noreply(deliver_or_enqueue_n(Messages, State));
handle_cast({requeue, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- {noreply, State};
+ noreply(State);
C = #cr{unacked_messages = UAM} ->
{Messages, NewUAM} = collect_messages(MsgIds, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {noreply, deliver_or_enqueue_n(
- [{Message, true} || Message <- Messages], State)}
+ noreply(deliver_or_enqueue_n(
+ [{Message, true} || Message <- Messages], State))
end;
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
- not_found -> {noreply, State};
+ not_found -> noreply(State);
T = #cr{unsent_message_count =Count} ->
- {noreply, possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State)}
+ noreply(possibly_unblock(
+ T#cr{unsent_message_count = Count - 1},
+ State))
end.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
@@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
+handle_info(timeout, State) ->
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5cc07aed32..33fe047c39 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -28,7 +28,7 @@
-include("rabbit.hrl").
-export([start_link/4, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4]).
+-export([send_command/2, deliver/4, conserve_memory/2]).
%% callbacks
-export([init/2, handle_message/2]).
@@ -49,6 +49,7 @@
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-endif.
@@ -77,11 +78,18 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
Pid ! {deliver, ConsumerTag, AckRequired, Msg},
ok.
+conserve_memory(Pid, Conserve) ->
+ Pid ! {conserve_memory, Conserve},
+ ok.
+
%%---------------------------------------------------------------------------
init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
+ %% this is bypassing the proxy so alarms can "jump the queue" and
+ %% be handled promptly
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
#ch{state = starting,
proxy_pid = ProxyPid,
reader_pid = ReaderPid,
@@ -129,6 +137,11 @@ handle_message({deliver, ConsumerTag, AckRequired, Msg},
true, ConsumerTag, DeliveryTag, Msg),
State1#ch{next_tag = DeliveryTag + 1};
+handle_message({conserve_memory, Conserve}, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
+ State;
+
handle_message({'EXIT', _Pid, Reason}, State) ->
terminate(Reason, State);
@@ -630,6 +643,12 @@ handle_method(#'channel.flow'{active = _}, _, State) ->
%% FIXME: implement
{reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow_ok'{active = _}, _, State) ->
+ %% TODO: We may want to correlate this to channel.flow messages we
+ %% have sent, and complain if we get an unsolicited
+ %% channel.flow_ok, or the client refuses our flow request.
+ {noreply, State};
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
@@ -707,21 +726,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
make_tx_id() -> rabbit_misc:guid().
-safe_pmap_set_ok(F, S) ->
- case lists:filter(fun (R) -> R =/= ok end,
- rabbit_misc:upmap(
- fun (V) ->
- try F(V)
- catch Class:Reason -> {Class, Reason}
- end
- end, sets:to_list(S))) of
- [] -> ok;
- Errors -> {error, Errors}
- end.
-
-notify_participants(F, TxnKey, Participants) ->
- safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
-
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
tx_participants = sets:new(),
@@ -729,8 +733,8 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
- case notify_participants(fun rabbit_amqqueue:commit/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:commit_all(sets:to_list(Participants),
+ TxnKey) of
ok -> new_tx(State);
{error, Errors} -> exit({commit_failed, Errors})
end.
@@ -743,8 +747,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case notify_participants(fun rabbit_amqqueue:rollback/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> exit({rollback_failed, Errors})
@@ -767,23 +771,18 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- safe_pmap_set_ok(
- fun (QueueName) ->
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:notify_down(Q, ProxyPid)
- end) of
- ok ->
- ok;
- {error, not_found} ->
- %% queue has been deleted in the meantime
- ok
- end
- end,
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)).
+ rabbit_amqqueue:notify_down_all(
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end],
+ ProxyPid).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bfd1ea72ff..7e68b3eddd 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -94,10 +94,18 @@
%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
-%% channel exit ->
-%% if abnormal exit then log error
-%% if last channel to exit then send connection.close_ok, start
-%% terminate_connection timer, *closing*
+%% channel exit with hard error
+%% -> log error, wait for channels to terminate forcefully, start
+%% terminate_connection timer, send close, *closed*
+%% channel exit with soft error
+%% -> log error, start terminate_channel timer, mark channel as
+%% closing
+%% if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
+%% else *closing*
+%% channel exits normally
+%% -> if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
%% receive connection.close_ok -> self() ! terminate_connection,
@@ -291,24 +299,13 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
-handle_dependent_exit(Pid, Reason,
- State = #v1{connection_state = closing}) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel ->
- case Reason of
- normal -> ok;
- _ -> log_channel_error(closing, Channel, Reason)
- end,
- maybe_close(State)
- end;
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
- State;
+ maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> handle_exception(State, Channel, Reason)
+ Channel -> maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(Pid) ->
@@ -365,13 +362,15 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State) ->
+maybe_close(State = #v1{connection_state = closing}) ->
case all_channels() of
[] -> ok = send_on_channel0(
State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State
- end.
+ end;
+maybe_close(State) ->
+ State.
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->