summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-07-16 11:01:51 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-07-16 11:01:51 +0100
commitd7d72c8b93c1d257f923471b51efa414a3daef2f (patch)
tree554859fafc728a629b278906c72892c155eb532c
parent067776e41e6442f6ab5ef82653f53ce113003b1b (diff)
parenta8aa180d3ee05f2bbf2c75aed3b86fad7d070950 (diff)
downloadrabbitmq-server-git-d7d72c8b93c1d257f923471b51efa414a3daef2f.tar.gz
Merge bug24956 (again).
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit.hrl4
-rw-r--r--packaging/RPMS/Fedora/Makefile1
-rw-r--r--packaging/common/rabbitmq-script-wrapper4
-rw-r--r--packaging/debs/Debian/Makefile1
-rw-r--r--packaging/debs/Debian/debian/rabbitmq-server.init5
-rwxr-xr-xpackaging/macports/make-port-diff.sh6
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rwxr-xr-xscripts/rabbitmq-server.bat4
-rwxr-xr-xscripts/rabbitmq-service.bat5
-rw-r--r--src/file_handle_cache.erl10
-rw-r--r--src/gm.erl10
-rw-r--r--src/rabbit_amqqueue.erl38
-rw-r--r--src/rabbit_amqqueue_process.erl52
-rw-r--r--src/rabbit_binding.erl11
-rw-r--r--src/rabbit_channel.erl52
-rw-r--r--src/rabbit_control_main.erl5
-rw-r--r--src/rabbit_disk_monitor.erl2
-rw-r--r--src/rabbit_exchange.erl158
-rw-r--r--src/rabbit_exchange_decorator.erl71
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl3
-rw-r--r--src/rabbit_exchange_type_fanout.erl3
-rw-r--r--src/rabbit_exchange_type_headers.erl3
-rw-r--r--src/rabbit_exchange_type_invalid.erl3
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_file.erl17
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl17
-rw-r--r--src/rabbit_mirror_queue_master.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl30
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_nodes.erl4
-rw-r--r--src/rabbit_parameter_validation.erl61
-rw-r--r--src/rabbit_policy.erl156
-rw-r--r--src/rabbit_prelaunch.erl6
-rw-r--r--src/rabbit_reader.erl135
-rw-r--r--src/rabbit_registry.erl7
-rw-r--r--src/rabbit_runtime_parameters.erl14
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl47
42 files changed, 742 insertions, 255 deletions
diff --git a/Makefile b/Makefile
index 49bf926afc..0e3960dcfa 100644
--- a/Makefile
+++ b/Makefile
@@ -206,7 +206,7 @@ run-qc: all
start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
mkdir -p $(RABBITMQ_MNESIA_DIR)
- setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" &
+ nohup sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" > /dev/null &
./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel
start-rabbit-on-node: all
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d6f6d51fab..2d25edee40 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -887,6 +887,10 @@
<listitem><para>Queue arguments.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name applying to the queue.</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>pid</term>
<listitem><para>Id of the Erlang process associated with the queue.</para></listitem>
</varlistentry>
@@ -999,6 +1003,10 @@
<term>arguments</term>
<listitem><para>Exchange arguments.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>policy</term>
+ <listitem><para>Policy name for applying to the exchange.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>exchangeinfoitem</command>s are specified then
@@ -1207,7 +1215,7 @@
</varlistentry>
<varlistentry>
<term>timeout</term>
- <listitem><para>Connection timeout.</para></listitem>
+ <listitem><para>Connection timeout / negotiated heartbeat interval, in seconds.</para></listitem>
</varlistentry>
<varlistentry>
<term>frame_max</term>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index ffe112a0e2..523b54ced1 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -19,7 +19,7 @@
{ssl_listeners, []},
{ssl_options, []},
{vm_memory_high_watermark, 0.4},
- {disk_free_limit, {mem_relative, 1.0}},
+ {disk_free_limit, 1000000000}, %% 1GB
{msg_store_index_module, rabbit_msg_store_ets_index},
{backing_queue_module, rabbit_variable_queue},
%% 0 ("no limit") would make a better default, but that
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5c73c8b88b..e8b4a6232e 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -43,11 +43,11 @@
-record(resource, {virtual_host, kind, name}).
-record(exchange, {name, type, durable, auto_delete, internal, arguments,
- scratch}).
+ scratches, policy}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, mirror_nodes}).
+ arguments, pid, slave_pids, mirror_nodes, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 180500ed38..03e513f889 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -42,6 +42,7 @@ ifeq "$(RPM_OS)" "fedora"
SOURCES/rabbitmq-server.init
endif
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@||' \
SOURCES/rabbitmq-script-wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0e59c21804..e832aed633 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -29,7 +29,9 @@ cd /var/lib/rabbitmq
SCRIPT=`basename $0`
-if [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
+if [ `id -u` = `id -u rabbitmq` -a "$SCRIPT" = "rabbitmq-server" ] ; then
+ /usr/lib/rabbitmq/bin/rabbitmq-server "$@" @STDOUT_STDERR_REDIRECTION@
+elif [ `id -u` = `id -u rabbitmq` -o "$SCRIPT" = "rabbitmq-plugins" ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
elif [ `id -u` = 0 ] ; then
@SU_RABBITMQ_SH_C@ "/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}"
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 844388c6f4..1e4bf75527 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -23,6 +23,7 @@ package: clean
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \
+ -e 's|@STDOUT_STDERR_REDIRECTION@| > "/var/log/rabbitmq/startup_log" 2> "/var/log/rabbitmq/startup_err"|' \
$(UNPACKED_DIR)/debian/rabbitmq-script-wrapper
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
diff --git a/packaging/debs/Debian/debian/rabbitmq-server.init b/packaging/debs/Debian/debian/rabbitmq-server.init
index c135207875..b2d3f86ab3 100644
--- a/packaging/debs/Debian/debian/rabbitmq-server.init
+++ b/packaging/debs/Debian/debian/rabbitmq-server.init
@@ -60,10 +60,7 @@ start_rabbitmq () {
set +e
RABBITMQ_PID_FILE=$PID_FILE start-stop-daemon --quiet \
--chuid rabbitmq --start --exec $DAEMON \
- --pidfile "$RABBITMQ_PID_FILE" \
- > "${INIT_LOG_DIR}/startup_log" \
- 2> "${INIT_LOG_DIR}/startup_err" \
- 0<&- &
+ --pidfile "$RABBITMQ_PID_FILE" --background
$CONTROL wait $PID_FILE >/dev/null 2>&1
RETVAL=$?
set -e
diff --git a/packaging/macports/make-port-diff.sh b/packaging/macports/make-port-diff.sh
index 3eb1b9f589..ac3afa4ee5 100755
--- a/packaging/macports/make-port-diff.sh
+++ b/packaging/macports/make-port-diff.sh
@@ -14,8 +14,10 @@ mkdir -p $dir/macports $dir/rabbitmq
cd $dir/macports
svn checkout http://svn.macports.org/repository/macports/trunk/dports/net/rabbitmq-server/ 2>&1 >/dev/null
-# Clear out the svn $id tag
-sed -i -e 's|^# \$.*$|# $Id$|' rabbitmq-server/Portfile
+# Clear out the svn $id tag from the Portfile (and avoid using -i)
+portfile=rabbitmq-server/Portfile
+sed -e 's|^# \$.*$|# $Id$|' ${portfile} > ${portfile}.new
+mv ${portfile}.new ${portfile}
# Get the files from the rabbitmq.com macports repo
cd ../rabbitmq
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 3c26872671..c67a0263f6 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -43,9 +43,9 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
-)
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index b8822739f5..167f272e41 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -86,8 +86,8 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-if "!RABBITMQ_PLUGINS_DIR!"=="" (
- set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 849bedcf6b..4758c861da 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -154,7 +154,10 @@ if "!RABBITMQ_ENABLED_PLUGINS_FILE!"=="" (
set RABBITMQ_ENABLED_PLUGINS_FILE=!RABBITMQ_BASE!\enabled_plugins
)
-set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+if "!RABBITMQ_PLUGINS_DIR!"=="" (
+ set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
+)
+
set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
if "!RABBITMQ_CONFIG_FILE!"=="" (
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f3b4dbafa2..13ee42499a 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -374,11 +374,11 @@ sync(Ref) ->
end).
needs_sync(Ref) ->
- with_handles(
- [Ref],
- fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
- ([_Handle]) -> true
- end).
+ %% This must *not* use with_handles/2; see bug 25052
+ case get({Ref, fhc_handle}) of
+ #handle { is_dirty = false, write_buffer = [] } -> false;
+ #handle {} -> true
+ end.
position(Ref, NewOffset) ->
with_flushed_handles(
diff --git a/src/gm.erl b/src/gm.erl
index 30fcdc5d86..f88ed18fbf 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -558,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -647,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -661,8 +661,10 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c1673504e7..afbaea651b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,7 +28,7 @@
-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([store_queue/1]).
+-export([update/2, store_queue/1, policy_changed/2]).
%% internal
@@ -71,6 +71,9 @@
-spec(internal_declare/2 ::
(rabbit_types:amqqueue(), boolean())
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:amqqueue()) -> rabbit_types:amqqueue())) -> 'ok').
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found');
@@ -157,6 +160,8 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -166,6 +171,9 @@
[queue_name, channel_pid, consumer_tag, ack_required]).
start() ->
+ %% Clear out remnants of old incarnation, in case we restarted
+ %% faster than other nodes handled DOWN messages from us.
+ on_node_down(node()),
DurableQueues = find_durable_queues(),
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
ok = BQ:start([QName || #amqqueue{name = QName} <- DurableQueues]),
@@ -222,9 +230,10 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue, QueueName}) of
- [] -> ok = store_queue(Q),
- B = add_default_binding(Q),
- fun () -> B(), Q end;
+ [] -> Q1 = rabbit_policy:set(Q),
+ ok = store_queue(Q1),
+ B = add_default_binding(Q1),
+ fun () -> B(), Q1 end;
%% Q exists on stopped node
[_] -> rabbit_misc:const(not_found)
end;
@@ -237,6 +246,19 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_queue, Name}) of
+ [Q = #amqqueue{durable = Durable}] ->
+ Q1 = Fun(Q),
+ ok = mnesia:write(rabbit_queue, Q1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
store_queue(Q = #amqqueue{durable = true}) ->
ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
@@ -245,6 +267,9 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
+policy_changed(_Q1, _Q2) ->
+ ok.
+
determine_queue_nodes(Args) ->
Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
@@ -508,7 +533,7 @@ basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -573,7 +598,8 @@ on_node_down(Node) ->
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
+ node(Pid) == Node andalso
+ not is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f2833c2623..8933de8746 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -197,23 +197,41 @@ declare(Recover, From, State = #q{q = Q,
backing_queue = BQ,
backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
- not_found -> {stop, normal, not_found, State};
- Q -> gen_server2:reply(From, {new, Q}),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use,
- [self()]),
- ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue,
- set_ram_duration_target, [self()]}),
- BQS = bq_init(BQ, Q, Recover),
- State1 = process_args(State#q{backing_queue_state = BQS}),
- rabbit_event:notify(queue_created,
- infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #q.stats_timer,
- fun() -> emit_stats(State1) end),
- noreply(State1);
- Q1 -> {stop, normal, {existing, Q1}, State}
- end.
+ not_found ->
+ {stop, normal, not_found, State};
+ Q1 ->
+ case matches(Recover, Q, Q1) of
+ true ->
+ gen_server2:reply(From, {new, Q}),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue,
+ set_ram_duration_target, [self()]}),
+ BQS = bq_init(BQ, Q, Recover),
+ State1 = process_args(State#q{backing_queue_state = BQS}),
+ rabbit_event:notify(queue_created,
+ infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #q.stats_timer,
+ fun() -> emit_stats(State1) end),
+ noreply(State1);
+ false ->
+ {stop, normal, {existing, Q1}, State}
+ end
+ end.
+
+matches(true, Q, Q) -> true;
+matches(true, _Q, _Q1) -> false;
+matches(false, Q1, Q2) ->
+ %% i.e. not policy
+ Q1#amqqueue.name =:= Q2#amqqueue.name andalso
+ Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
+ Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
+ Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
+ Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
+ Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
+ Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
bq_init(BQ, Q, Recover) ->
Self = self(),
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index bb44797e4d..f0ea514dcf 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -173,13 +173,11 @@ add(Src, Dst, B) ->
mnesia:read({rabbit_durable_route, B}) =:= []) of
true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
- ok = rabbit_exchange:callback(
- Src, add_binding, [transaction, Src, B]),
+ x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
fun () ->
- ok = rabbit_exchange:callback(
- Src, add_binding, [Serial, Src, B]),
- ok = rabbit_event:notify(binding_created, info(B))
+ x_callback(Serial, Src, add_binding, B),
+ ok = rabbit_event:notify(binding_created, info(B))
end;
false -> rabbit_misc:const({error, binding_not_found})
end.
@@ -487,4 +485,5 @@ process_deletions(Deletions) ->
del_notify(Bs) -> [rabbit_event:notify(binding_deleted, info(B)) || B <- Bs].
-x_callback(Arg, X, F, Bs) -> ok = rabbit_exchange:callback(X, F, [Arg, X, Bs]).
+x_callback(Serial, X, F, Bs) ->
+ ok = rabbit_exchange:callback(X, F, Serial, [X, Bs]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22c6a22361..864e100afc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow},
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- send_exception(Reason#amqp_error{method = MethodName}, State);
+ handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -400,11 +400,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
@@ -418,6 +418,11 @@ send_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+precondition_failed(Format) -> precondition_failed(Format, []).
+
+precondition_failed(Format, Params) ->
+ rabbit_misc:protocol_error(precondition_failed, Format, Params).
+
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
@@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username},
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual}}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "user_id property set to '~s' but "
- "authenticated user was '~s'", [Claimed, Actual]).
+ precondition_failed(
+ "user_id property set to '~s' but authenticated user was '~s'",
+ [Claimed, Actual]).
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
State1#ch{uncommitted_message_q = NewTMQ}
end};
{error, Reason} ->
- rabbit_misc:protocol_error(precondition_failed,
- "invalid message: ~p", [Reason])
+ precondition_failed("invalid message: ~p", [Reason])
end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
@@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount})
@@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from confirm to tx mode", []);
+ precondition_failed("cannot switch from confirm to tx mode");
handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.commit'{}, _,
State = #ch{uncommitted_message_q = TMQ,
@@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _,
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
uncommitted_acks = TAL,
@@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from tx to confirm mode", []);
+ precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- rabbit_misc:protocol_error(
- precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
ack(Acked, State) ->
@@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
State#ch{tx_status = in_progress};
complete_tx(State = #ch{tx_status = failed}) ->
- {noreply, State1} = send_exception(
+ {noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2e163cfbe1..b23088cc7a 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -156,6 +156,11 @@ start() ->
{'EXIT', {badarg, _}} ->
print_error("invalid parameter: ~p", [Args]),
usage();
+ {error, {Problem, Reason}} when is_atom(Problem); is_binary(Reason) ->
+ %% We handle this common case specially to avoid ~p since
+ %% that has i18n issues
+ print_error("~s: ~s", [Problem, Reason]),
+ rabbit_misc:quit(2);
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index ed29bd80b9..58375abb45 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -27,7 +27,7 @@
set_check_interval/1, get_disk_free/0]).
-define(SERVER, ?MODULE).
--define(DEFAULT_DISK_CHECK_INTERVAL, 60000).
+-define(DEFAULT_DISK_CHECK_INTERVAL, 10000).
-record(state, {dir,
limit,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 910a89b42c..57c571f1ff 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -18,13 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, callback/3, declare/6,
+-export([recover/0, policy_changed/2, callback/4, declare/6,
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
- lookup/1, lookup_or_die/1, list/1, update_scratch/2,
+ lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
route/2, delete/2]).
%% these must be run inside a mnesia tx
--export([maybe_auto_delete/1, serial/1, peek_serial/1]).
+-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
%%----------------------------------------------------------------------------
@@ -37,7 +37,12 @@
-type(fun_name() :: atom()).
-spec(recover/0 :: () -> [name()]).
--spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok').
+-spec(callback/4::
+ (rabbit_types:exchange(), fun_name(),
+ fun((boolean()) -> non_neg_integer()) | atom(),
+ [any()]) -> 'ok').
+-spec(policy_changed/2 ::
+ (rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
(name(), type(), boolean(), boolean(), boolean(),
rabbit_framing:amqp_table())
@@ -58,7 +63,13 @@
(name()) -> rabbit_types:exchange() |
rabbit_types:channel_exit()).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
--spec(update_scratch/2 :: (name(), fun((any()) -> any())) -> 'ok').
+-spec(lookup_scratch/2 :: (name(), atom()) ->
+ rabbit_types:ok(term()) |
+ rabbit_types:error('not_found')).
+-spec(update_scratch/3 :: (name(), atom(), fun((any()) -> any())) -> 'ok').
+-spec(update/2 ::
+ (name(),
+ fun((rabbit_types:exchange()) -> rabbit_types:exchange())) -> 'ok').
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:exchange()) -> rabbit_types:infos()).
-spec(info/2 ::
@@ -76,14 +87,16 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
--spec(serial/1 :: (rabbit_types:exchange()) -> 'none' | pos_integer()).
+-spec(serial/1 :: (rabbit_types:exchange()) ->
+ fun((boolean()) -> 'none' | pos_integer())).
-spec(peek_serial/1 :: (name()) -> pos_integer() | 'undefined').
-endif.
%%----------------------------------------------------------------------------
--define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]).
+-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments,
+ policy]).
recover() ->
Xs = rabbit_misc:table_filter(
@@ -95,21 +108,52 @@ recover() ->
true -> store(X);
false -> ok
end,
- rabbit_exchange:callback(X, create, [map_create_tx(Tx), X])
+ callback(X, create, map_create_tx(Tx), [X])
end,
rabbit_durable_exchange),
[XName || #exchange{name = XName} <- Xs].
-callback(#exchange{type = XType}, Fun, Args) ->
- apply(type_to_module(XType), Fun, Args).
+callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
+ Serial = fun (Bool) ->
+ case Serial0 of
+ _ when is_atom(Serial0) -> Serial0;
+ _ -> Serial0(Bool)
+ end
+ end,
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
+ || M <- decorators()],
+ Module = type_to_module(XType),
+ apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
+
+policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+
+serialise_events(X = #exchange{type = Type}) ->
+ case [Serialise || M <- decorators(),
+ Serialise <- [M:serialise_events(X)],
+ Serialise == true] of
+ [] -> (type_to_module(Type)):serialise_events();
+ _ -> true
+ end.
+
+serial(#exchange{name = XName} = X) ->
+ Serial = case serialise_events(X) of
+ true -> next_serial(XName);
+ false -> none
+ end,
+ fun (true) -> Serial;
+ (false) -> none
+ end.
+
+decorators() ->
+ [M || {_, M} <- rabbit_registry:lookup_all(exchange_decorator)].
declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
- X = #exchange{name = XName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- arguments = Args},
+ X = rabbit_policy:set(#exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ internal = Internal,
+ arguments = Args}),
XT = type_to_module(Type),
%% We want to upset things if it isn't ok
ok = XT:validate(X),
@@ -129,7 +173,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
end
end,
fun ({new, Exchange}, Tx) ->
- ok = XT:create(map_create_tx(Tx), Exchange),
+ ok = callback(X, create, map_create_tx(Tx), [Exchange]),
rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
Exchange;
({existing, Exchange}, _Tx) ->
@@ -141,13 +185,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
map_create_tx(true) -> transaction;
map_create_tx(false) -> none.
-store(X = #exchange{name = Name, type = Type}) ->
- ok = mnesia:write(rabbit_exchange, X, write),
- case (type_to_module(Type)):serialise_events() of
- true -> S = #exchange_serial{name = Name, next = 1},
- ok = mnesia:write(rabbit_exchange_serial, S, write);
- false -> ok
- end.
+store(X) -> ok = mnesia:write(rabbit_exchange, X, write).
%% Used with binaries sent over the wire; the type may not exist.
check_type(TypeBin) ->
@@ -200,23 +238,51 @@ list(VHostPath) ->
rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
-update_scratch(Name, Fun) ->
+lookup_scratch(Name, App) ->
+ case lookup(Name) of
+ {ok, #exchange{scratches = undefined}} ->
+ {error, not_found};
+ {ok, #exchange{scratches = Scratches}} ->
+ case orddict:find(App, Scratches) of
+ {ok, Value} -> {ok, Value};
+ error -> {error, not_found}
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
+
+update_scratch(Name, App, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
- case mnesia:wread({rabbit_exchange, Name}) of
- [X = #exchange{durable = Durable, scratch = Scratch}] ->
- X1 = X#exchange{scratch = Fun(Scratch)},
- ok = mnesia:write(rabbit_exchange, X1, write),
- case Durable of
- true -> ok = mnesia:write(rabbit_durable_exchange,
- X1, write);
- _ -> ok
- end;
- [] ->
- ok
- end
+ update(Name,
+ fun(X = #exchange{scratches = Scratches0}) ->
+ Scratches1 = case Scratches0 of
+ undefined -> orddict:new();
+ _ -> Scratches0
+ end,
+ Scratch = case orddict:find(App, Scratches1) of
+ {ok, S} -> S;
+ error -> undefined
+ end,
+ Scratches2 = orddict:store(
+ App, Fun(Scratch), Scratches1),
+ X#exchange{scratches = Scratches2}
+ end)
end).
+update(Name, Fun) ->
+ case mnesia:wread({rabbit_exchange, Name}) of
+ [X = #exchange{durable = Durable}] ->
+ X1 = Fun(X),
+ ok = mnesia:write(rabbit_exchange, X1, write),
+ case Durable of
+ true -> ok = mnesia:write(rabbit_durable_exchange, X1, write);
+ _ -> ok
+ end;
+ [] ->
+ ok
+ end.
+
info_keys() -> ?INFO_KEYS.
map(VHostPath, F) ->
@@ -232,6 +298,7 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
+i(policy, X) -> rabbit_policy:name(X);
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
@@ -341,23 +408,18 @@ unconditional_delete(X = #exchange{name = XName}) ->
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
-serial(#exchange{name = XName, type = Type}) ->
- case (type_to_module(Type)):serialise_events() of
- true -> next_serial(XName);
- false -> none
- end.
-
next_serial(XName) ->
- [#exchange_serial{next = Serial}] =
- mnesia:read(rabbit_exchange_serial, XName, write),
+ Serial = peek_serial(XName, write),
ok = mnesia:write(rabbit_exchange_serial,
#exchange_serial{name = XName, next = Serial + 1}, write),
Serial.
-peek_serial(XName) ->
- case mnesia:read({rabbit_exchange_serial, XName}) of
+peek_serial(XName) -> peek_serial(XName, read).
+
+peek_serial(XName, LockType) ->
+ case mnesia:read(rabbit_exchange_serial, XName, LockType) of
[#exchange_serial{next = Serial}] -> Serial;
- _ -> undefined
+ _ -> 1
end.
invalid_module(T) ->
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
new file mode 100644
index 0000000000..b40ceda918
--- /dev/null
+++ b/src/rabbit_exchange_decorator.erl
@@ -0,0 +1,71 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator).
+
+%% This is like an exchange type except that:
+%%
+%% 1) It applies to all exchanges as soon as it is installed, therefore
+%% 2) It is not allowed to affect validation, so no validate/1 or
+%% assert_args_equivalence/2
+%% 3) It also can't affect routing
+%%
+%% It's possible in the future we might relax 3), or even make these
+%% able to manipulate messages as they are published.
+
+-ifdef(use_specs).
+
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplist:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events(rabbit_types:exchange()) -> boolean().
+
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
+
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
+
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
+
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
+
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
+ {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1027570c8b..e6470b721e 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -58,6 +58,10 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
+%% called when the policy attached to this exchange changes.
+-callback policy_changed (
+ serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
+
-else.
-export([behaviour_info/1]).
@@ -65,7 +69,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}];
+ {assert_args_equivalence, 2}, {policy_changed, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index cdec1cb9f2..9a5665c078 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -43,6 +43,7 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index a64f2c2924..d9a2f60fdd 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -42,6 +42,7 @@ route(#exchange{name = Name}, _Delivery) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 61917d8f4a..516b78e59c 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -116,6 +116,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 82d27960bc..101fe434e9 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,7 +20,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3,
+-export([validate/1, create/2, delete/3, policy_changed/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
description() ->
@@ -40,6 +40,7 @@ route(#exchange{name = Name, type = Type}, _) ->
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 3160fdf46e..644d9acf07 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,7 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, add_binding/3,
+-export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -58,6 +58,8 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
+policy_changed(_Tx, _X1, _X2) -> ok.
+
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
add_binding(none, _Exchange, _Binding) ->
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f318..a95f8f269d 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -102,9 +102,12 @@ read_file_info(File) ->
with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
with_fhc_handle(Fun) ->
- ok = file_handle_cache:obtain(),
+ with_fhc_handle(1, Fun).
+
+with_fhc_handle(N, Fun) ->
+ [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
try Fun()
- after ok = file_handle_cache:release()
+ after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
end.
read_term_file(File) ->
@@ -165,7 +168,7 @@ make_binary(List) ->
{error, Reason}
end.
-
+%% TODO the semantics of this function are rather odd. But see bug 25021.
append_file(File, Suffix) ->
case read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -183,9 +186,11 @@ append_file(File, 0, Suffix) ->
end
end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
+ case with_fhc_handle(2, fun () ->
+ file:copy(File, {[File, Suffix], [append]})
+ end) of
+ {ok, _BytesCopied} -> ok;
+ Error -> Error
end.
ensure_parent_dirs_exist(Filename) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 71e0507a33..10debb0b08 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -36,8 +36,6 @@
length_fun
}).
--define(ONE_SECOND, 1000).
-
-ifdef(use_specs).
-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
@@ -325,7 +323,6 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = pmon:new(),
@@ -359,11 +356,6 @@ handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
handle_cast({delete_and_terminate, Reason}, State) ->
{stop, Reason, State}.
-handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
- gm:broadcast(GM, heartbeat),
- ensure_gm_heartbeat(),
- noreply(State);
-
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Mons,
death_fun = DeathFun }) ->
@@ -399,15 +391,15 @@ members_changed([_CPid], _Births, []) ->
members_changed([CPid], _Births, Deaths) ->
ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
-handle_msg([_CPid], _From, heartbeat) ->
+handle_msg([_CPid], _From, master_changed) ->
ok;
handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
-handle_msg([CPid], _From, {delete_and_terminate, Reason} = Msg) ->
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, Msg),
- {stop, Reason};
+ {stop, {shutdown, ring_shutdown}};
handle_msg([_CPid], _From, _Msg) ->
ok.
@@ -420,6 +412,3 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
-
-ensure_gm_heartbeat() ->
- erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4e71cc43db..750bcd56e2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -127,10 +127,21 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
+ Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ monitor_wait(MRefs),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
+monitor_wait([]) ->
+ ok;
+monitor_wait([MRef | MRefs]) ->
+ receive({'DOWN', MRef, process, _Pid, _Info}) ->
+ ok
+ end,
+ monitor_wait(MRefs).
+
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e412fbbc5d..60d3e027fb 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -199,7 +199,12 @@ handle_call({gm_deaths, Deaths}, From,
%% master has changed to not us.
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
+ %% GM is lazy. So we know of the death of the
+ %% slave since it is a neighbour of ours, but
+ %% until a message is sent, not all members will
+ %% know. That might include the new master. So
+ %% broadcast a no-op message to wake everyone up.
+ ok = gm:broadcast(GM, master_changed),
noreply(State #state { master_pid = Pid })
end
end;
@@ -341,7 +346,7 @@ members_changed([_SPid], _Births, []) ->
members_changed([SPid], _Births, Deaths) ->
inform_deaths(SPid, Deaths).
-handle_msg([_SPid], _From, heartbeat) ->
+handle_msg([_SPid], _From, master_changed) ->
ok;
handle_msg([_SPid], _From, request_length) ->
%% This is only of value to the master
@@ -351,20 +356,17 @@ handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
ok;
handle_msg([SPid], _From, {process_death, Pid}) ->
inform_deaths(SPid, [Pid]);
+handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
+ ok = gen_server2:cast(CPid, {gm, Msg}),
+ {stop, {shutdown, ring_shutdown}};
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Deaths) ->
- rabbit_misc:with_exit_handler(
- fun () -> {stop, normal} end,
- fun () ->
- case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
- ok ->
- ok;
- {promote, CPid} ->
- {become, rabbit_mirror_queue_coordinator, [CPid]}
- end
- end).
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
%% ---------------------------------------------------------------------------
%% Others
@@ -455,7 +457,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
- ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% TODO this has been in here since the beginning, but it's not
+ %% obvious if it is needed. Investigate...
+ ok = gm:confirmed_broadcast(GM, master_changed),
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d41aa09b15..1fefa68877 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,7 +36,7 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([tcp_name/3]).
+-export([tcp_name/3, format_inet_error/1]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -152,6 +152,7 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
+-spec(format_inet_error/1 :: (atom()) -> string()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
@@ -510,6 +511,10 @@ tcp_name(Prefix, IPAddress, Port)
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
+format_inet_error(address) -> "cannot connect to host/port";
+format_inet_error(timeout) -> "timed out";
+format_inet_error(Error) -> inet:format_error(Error).
+
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 1c23632d52..c8d77b0f87 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -70,8 +70,8 @@ diagnostics0() ->
diagnostics_host(Host) ->
case names(Host) of
{error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [Host, EpmdReason]};
+ {"- unable to connect to epmd on ~s: ~w (~s)",
+ [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]};
{ok, NamePorts} ->
{"- ~s: ~p",
[Host, [{list_to_atom(Name), Port} ||
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
new file mode 100644
index 0000000000..af940dde97
--- /dev/null
+++ b/src/rabbit_parameter_validation.erl
@@ -0,0 +1,61 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_parameter_validation).
+
+-export([number/2, binary/2, list/2, proplist/3]).
+
+number(_Name, Term) when is_number(Term) ->
+ ok;
+
+number(Name, Term) ->
+ {error, "~s should be number, actually was ~p", [Name, Term]}.
+
+binary(_Name, Term) when is_binary(Term) ->
+ ok;
+
+binary(Name, Term) ->
+ {error, "~s should be binary, actually was ~p", [Name, Term]}.
+
+list(_Name, Term) when is_list(Term) ->
+ ok;
+
+list(Name, Term) ->
+ {error, "~s should be list, actually was ~p", [Name, Term]}.
+
+proplist(Name, Constraints, Term) when is_list(Term) ->
+ {Results, Remainder}
+ = lists:foldl(
+ fun ({Key, Fun, Needed}, {Results0, Term0}) ->
+ case {lists:keytake(Key, 1, Term0), Needed} of
+ {{value, {Key, Value}, Term1}, _} ->
+ {[Fun(Key, Value) | Results0],
+ Term1};
+ {false, mandatory} ->
+ {[{error, "Key \"~s\" not found in ~s",
+ [Key, Name]} | Results0], Term0};
+ {false, optional} ->
+ {Results0, Term0}
+ end
+ end, {[], Term}, Constraints),
+ case Remainder of
+ [] -> Results;
+ _ -> [{error, "Unrecognised terms ~p in ~s", [Remainder, Name]}
+ | Results]
+ end;
+
+proplist(Name, _Constraints, Term) ->
+ {error, "~s not a list ~p", [Name, Term]}.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
new file mode 100644
index 0000000000..1551795f7c
--- /dev/null
+++ b/src/rabbit_policy.erl
@@ -0,0 +1,156 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy).
+
+%% TODO specs
+
+-behaviour(rabbit_runtime_parameter).
+
+-include("rabbit.hrl").
+
+-import(rabbit_misc, [pget/2]).
+
+-export([register/0]).
+-export([name/1, get/2, set/1]).
+-export([validate/3, validate_clear/2, notify/3, notify_clear/2]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "policy parameters"},
+ {mfa, {rabbit_policy, register, []}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+register() ->
+ rabbit_registry:register(runtime_parameter, <<"policy">>, ?MODULE).
+
+name(#amqqueue{policy = Policy}) -> name0(Policy);
+name(#exchange{policy = Policy}) -> name0(Policy).
+
+name0(undefined) -> none;
+name0(Policy) -> pget(<<"name">>, Policy).
+
+set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
+set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
+
+set0(Name) -> match(Name, list()).
+
+get(Name, #amqqueue{policy = Policy}) -> get0(Name, Policy);
+get(Name, #exchange{policy = Policy}) -> get0(Name, Policy);
+%% Caution - SLOW.
+get(Name, EntityName = #resource{}) -> get0(Name, match(EntityName, list())).
+
+get0(_Name, undefined) -> {error, not_found};
+get0(Name, List) -> case pget(<<"policy">>, List) of
+ undefined -> {error, not_found};
+ Policy -> case pget(Name, Policy) of
+ undefined -> {error, not_found};
+ Value -> {ok, Value}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+
+validate(<<"policy">>, Name, Term) ->
+ rabbit_parameter_validation:proplist(
+ Name, policy_validation(), Term).
+
+validate_clear(<<"policy">>, _Name) ->
+ ok.
+
+notify(<<"policy">>, _Name, _Term) ->
+ update_policies().
+
+notify_clear(<<"policy">>, _Name) ->
+ update_policies().
+
+%%----------------------------------------------------------------------------
+
+list() ->
+ [[{<<"name">>, pget(key, P)} | pget(value, P)]
+ || P <- rabbit_runtime_parameters:list(<<"policy">>)].
+
+update_policies() ->
+ Policies = list(),
+ {Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ {[update_exchange(X, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ VHost <- rabbit_vhost:list(),
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end),
+ [notify(X) || X <- Xs],
+ [notify(Q) || Q <- Qs],
+ ok.
+
+update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(XName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_exchange:update(
+ XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
+ {X, X#exchange{policy = NewPolicy}}
+ end.
+
+update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
+ NewPolicy = match(QName, Policies),
+ case NewPolicy of
+ OldPolicy -> no_change;
+ _ -> rabbit_amqqueue:update(
+ QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
+ {Q, Q#amqqueue{policy = NewPolicy}}
+ end.
+
+notify(no_change)->
+ ok;
+notify({X1 = #exchange{}, X2 = #exchange{}}) ->
+ rabbit_exchange:policy_changed(X1, X2);
+notify({Q1 = #amqqueue{}, Q2 = #amqqueue{}}) ->
+ rabbit_amqqueue:policy_changed(Q1, Q2).
+
+match(Name, Policies) ->
+ case lists:sort(fun sort_pred/2, [P || P <- Policies, matches(Name, P)]) of
+ [] -> undefined;
+ [Policy | _Rest] -> Policy
+ end.
+
+matches(#resource{name = Name, virtual_host = VHost}, Policy) ->
+ Prefix = pget(<<"prefix">>, Policy),
+ case pget(<<"vhost">>, Policy) of
+ undefined -> prefix(Prefix, Name);
+ VHost -> prefix(Prefix, Name);
+ _ -> false
+ end.
+
+prefix(A, B) -> lists:prefix(binary_to_list(A), binary_to_list(B)).
+
+sort_pred(A, B) ->
+ R = size(pget(<<"prefix">>, A)) >= size(pget(<<"prefix">>, B)),
+ case {pget(<<"vhost">>, A), pget(<<"vhost">>, B)} of
+ {undefined, undefined} -> R;
+ {undefined, _} -> true;
+ {_, undefined} -> false;
+ _ -> R
+ end.
+
+%%----------------------------------------------------------------------------
+
+policy_validation() ->
+ [{<<"vhost">>, fun rabbit_parameter_validation:binary/2, optional},
+ {<<"prefix">>, fun rabbit_parameter_validation:binary/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d56211b50e..b045443533 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) ->
{error, EpmdReason} ->
rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
- case EpmdReason of
- address -> "unable to establish tcp connection";
- timeout -> "timed out establishing tcp connection";
- _ -> inet:format_error(EpmdReason)
- end])
+ rabbit_misc:format_inet_error(EpmdReason)])
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 518021a49f..5a1e1815b3 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
Channel, Reason))
end.
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +470,53 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+ State;
+handle_exception(State, Channel, Reason) ->
+ send_exception(State, Channel, Reason).
+
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) ->
Channel, State#v1.connection_state, Frame})
end.
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
+ end.
+
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
control_throttle(State);
@@ -536,13 +587,15 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
control_throttle(State).
+%%--------------------------------------------------------------------------
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize},
- PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
+ State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
switch_callback(handle_frame(Type, Channel, Payload, State),
@@ -834,8 +887,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end);
+ socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
+ fun ([{_, I}]) -> I end, Sock);
i(state, #v1{connection_state = S}) ->
S;
i(last_blocked_by, #v1{last_blocked_by = By}) ->
@@ -871,10 +924,7 @@ i(Item, #v1{}) ->
throw({bad_argument, Item}).
socket_info(Get, Select, Sock) ->
- socket_info(fun() -> Get(Sock) end, Select).
-
-socket_info(Get, Select) ->
- case Get() of
+ case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
@@ -897,51 +947,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = rabbit_writer:internal_send_command(
- State1#v1.sock, 0, CloseMethod, Protocol),
- State1.
-
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 637835c327..e14bbba018 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -104,9 +104,10 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 172cee92db..3a54e8f621 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
--export([parse_set/3, set/3, clear/2, list/0, list/1, list_formatted/0,
- lookup/2, value/2, value/3, info_keys/0]).
+-export([parse_set/3, set/3, clear/2, list/0, list/1, list_strict/1,
+ list_formatted/0, lookup/2, value/2, value/3, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -31,7 +31,8 @@
-spec(set/3 :: (binary(), binary(), term()) -> ok_or_error_string()).
-spec(clear/2 :: (binary(), binary()) -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
--spec(list/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
+-spec(list/1 :: (binary()) -> [rabbit_types:infos()]).
+-spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
-spec(list_formatted/0 :: () -> [rabbit_types:infos()]).
-spec(lookup/2 :: (binary(), binary()) -> rabbit_types:infos()).
-spec(value/2 :: (binary(), binary()) -> term()).
@@ -122,11 +123,14 @@ mnesia_clear(Component, Key) ->
list() ->
[p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
-list(Component) ->
+list(Component) -> list(Component, []).
+list_strict(Component) -> list(Component, not_found).
+
+list(Component, Default) ->
case lookup_component(Component) of
{ok, _} -> Match = #runtime_parameters{key = {Component, '_'}, _ = '_'},
[p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
- _ -> not_found
+ _ -> Default
end.
list_formatted() ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5545cccff7..bb60bd125e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -618,8 +618,8 @@ test_topic_matching() ->
exchange_op_callback(X, Fun, Args) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> rabbit_exchange:callback(X, Fun, [transaction, X] ++ Args) end),
- rabbit_exchange:callback(X, Fun, [none, X] ++ Args).
+ fun () -> rabbit_exchange:callback(X, Fun, transaction, [X] ++ Args) end),
+ rabbit_exchange:callback(X, Fun, none, [X] ++ Args).
test_topic_expect_match(X, List) ->
lists:foreach(
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 485ccc5f02..18704807ba 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -37,6 +37,9 @@
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
-rabbit_upgrade({topic_trie_node, mnesia, []}).
-rabbit_upgrade({runtime_parameters, mnesia, []}).
+-rabbit_upgrade({exchange_scratches, mnesia, [exchange_scratch]}).
+-rabbit_upgrade({policy, mnesia,
+ [exchange_scratches, ha_mirrors]}).
%% -------------------------------------------------------------------
@@ -58,6 +61,7 @@
-spec(mirrored_supervisor/0 :: () -> 'ok').
-spec(topic_trie_node/0 :: () -> 'ok').
-spec(runtime_parameters/0 :: () -> 'ok').
+-spec(policy/0 :: () -> 'ok').
-endif.
@@ -193,6 +197,49 @@ runtime_parameters() ->
{attributes, [key, value]},
{disc_copies, [node()]}]).
+exchange_scratches() ->
+ ok = exchange_scratches(rabbit_exchange),
+ ok = exchange_scratches(rabbit_durable_exchange).
+
+exchange_scratches(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type = <<"x-federation">>, Dur, AutoDel, Int, Args,
+ Scratch}) ->
+ Scratches = orddict:store(federation, Scratch, orddict:new()),
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches};
+ %% We assert here that nothing else uses the scratch mechanism ATM
+ ({exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches]).
+
+policy() ->
+ ok = exchange_policy(rabbit_exchange),
+ ok = exchange_policy(rabbit_durable_exchange),
+ ok = queue_policy(rabbit_queue),
+ ok = queue_policy(rabbit_durable_queue).
+
+exchange_policy(Table) ->
+ transform(
+ Table,
+ fun ({exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches}) ->
+ {exchange, Name, Type, Dur, AutoDel, Int, Args, Scratches,
+ undefined}
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches,
+ policy]).
+
+queue_policy(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes}) ->
+ {amqqueue, Name, Dur, AutoDel, Excl, Args, Pid, SPids, MNodes,
+ undefined}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid,
+ slave_pids, mirror_nodes, policy]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->