summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-03-19 12:02:58 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-03-19 12:02:58 +0000
commit7d2da0429ec8df75abed511f86a95a4d7b41cd34 (patch)
treec26f860d74e44699173b01dce9bc6e6021f6c7dc /src
parent2c8e2165592d838bb94064699801a60d36fa7b8f (diff)
parentace35ee0dd04aa2f0d3dbba811c2ed8cbb3da19d (diff)
downloadrabbitmq-server-git-7d2da0429ec8df75abed511f86a95a4d7b41cd34.tar.gz
Merged default into bug25384
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_binding.erl30
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_disk_monitor.erl7
-rw-r--r--src/rabbit_error_logger_file_h.erl3
-rw-r--r--src/rabbit_exchange.erl17
-rw-r--r--src/rabbit_exchange_decorator.erl10
-rw-r--r--src/rabbit_exchange_type.erl17
-rw-r--r--src/rabbit_exchange_type_direct.erl8
-rw-r--r--src/rabbit_exchange_type_fanout.erl6
-rw-r--r--src/rabbit_exchange_type_headers.erl41
-rw-r--r--src/rabbit_exchange_type_invalid.erl8
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_node_monitor.erl76
-rw-r--r--src/rabbit_tests.erl1
17 files changed, 233 insertions, 76 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index f3ba022ad3..3cfa21ba9b 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -236,7 +236,7 @@
{memory, any()}]).
-spec(is_running/0 :: () -> boolean()).
-spec(is_running/1 :: (node()) -> boolean()).
--spec(environment/0 :: () -> [{param() | term()}]).
+-spec(environment/0 :: () -> [{param(), term()}]).
-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
-spec(force_event_refresh/0 :: () -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ae7fe5c5e1..82ac74fac5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -407,7 +407,8 @@ args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
+ {<<"x-max-length">>, fun check_max_length_arg/2}].
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -418,6 +419,13 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
+check_max_length_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_negative, Val}};
+ Error -> Error
+ end.
+
check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fba58d38d7..18b641d4f7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -55,6 +55,7 @@
queue_monitors,
dlx,
dlx_routing_key,
+ max_length,
status
}).
@@ -242,7 +243,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
[{<<"x-expires">>, fun init_expires/2},
{<<"x-dead-letter-exchange">>, fun init_dlx/2},
{<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2}]).
+ {<<"x-message-ttl">>, fun init_ttl/2},
+ {<<"x-max-length">>, fun init_max_length/2}]).
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
@@ -254,6 +256,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
init_dlx_routing_key(RoutingKey, State) ->
State#q{dlx_routing_key = RoutingKey}.
+init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =
lists:foldl(fun (F, S) -> F(S) end, State,
@@ -557,27 +561,50 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- IsEmpty = BQ:is_empty(BQS),
BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- State3 = State2#q{backing_queue_state = BQS1},
+ {Dropped, State3 = #q{backing_queue_state = BQS2}} =
+ maybe_drop_head(State2#q{backing_queue_state = BQS1}),
+ QLen = BQ:len(BQS2),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
- %% we only do that IFF the new message ends up at the head
- %% of the queue (because the queue was empty) and has an
- %% expiry. Only then may it need expiring straight away,
- %% or, if expiry is not due yet, the expiry timer may need
- %% (re)scheduling.
- case {IsEmpty, Props#message_properties.expiry} of
- {false, _} -> State3;
- {true, undefined} -> State3;
- {true, _} -> drop_expired_msgs(State3)
+ %% we only do that if a new message that might have an
+ %% expiry ends up at the head of the queue. If the head
+ %% remains unchanged, or if the newly published message
+ %% has no expiry and becomes the head of the queue then
+ %% the call is unnecessary.
+ case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
+ {false, false, _} -> State3;
+ {true, true, undefined} -> State3;
+ {_, _, _} -> drop_expired_msgs(State3)
end
end.
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ {0, State};
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case BQ:len(BQS) - MaxLen of
+ Excess when Excess > 0 ->
+ {Excess,
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end,
+ fun () ->
+ {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) ->
+ BQ:drop(false, BQS0)
+ end, {ok, BQS},
+ lists:seq(1, Excess)),
+ State#q{backing_queue_state = BQS1}
+ end)};
+ _ -> {0, State}
+ end.
+
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
+ run_message_queue(drop_expired_msgs(State1)).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -761,6 +788,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
end, rejected, X, State),
State1.
+dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ lists:foldl(fun (_, {ok, Acc0, BQS0}) ->
+ {{Msg, _, AckTag}, BQS1} =
+ BQ:fetch(true, BQS0),
+ {ok, DLFun(Msg, AckTag, Acc0), BQS1}
+ end, {ok, Acc, BQS}, lists:seq(1, Excess))
+ end, maxlen, X, State),
+ State1.
+
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
publish_seqno = SeqNo0,
unconfirmed = UC0,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 6096e07b2d..cb86e5aeac 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -40,8 +40,11 @@
[{'not_found', (rabbit_types:binding_source() |
rabbit_types:binding_destination())} |
{'absent', rabbit_types:amqqueue()}]})).
+
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
- rabbit_types:error('binding_not_found')).
+ rabbit_types:error(
+ 'binding_not_found' |
+ {'binding_invalid', string(), [any()]})).
-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
@@ -157,15 +160,22 @@ add(Binding, InnerFun) ->
binding_action(
Binding,
fun (Src, Dst, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(Src, Dst) of
- ok -> case mnesia:read({rabbit_route, B}) of
- [] -> add(Src, Dst, B);
- [_] -> fun rabbit_misc:const_ok/0
- end;
- {error, _} = Err -> rabbit_misc:const(Err)
+ case rabbit_exchange:validate_binding(Src, B) of
+ ok ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> add(Src, Dst, B);
+ [_] -> fun rabbit_misc:const_ok/0
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
end
end).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0510afa9a4..792a06c908 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1194,6 +1194,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(DestinationName)]);
+ {error, {binding_invalid, Fmt, Args}} ->
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index b396b2899d..3bb163a18c 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -31,6 +31,7 @@
-record(state, {dir,
limit,
+ actual,
timeout,
timer,
alarmed
@@ -106,8 +107,8 @@ handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-handle_call(get_disk_free, _From, State = #state { dir = Dir }) ->
- {reply, get_disk_free(Dir), State};
+handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
+ {reply, Actual, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -156,7 +157,7 @@ internal_update(State = #state { limit = Limit,
_ ->
ok
end,
- State #state {alarmed = NewAlarmed}.
+ State #state {alarmed = NewAlarmed, actual = CurrentFreeBytes}.
get_disk_free(Dir) ->
get_disk_free(Dir, os:type()).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 3efc9c0ccb..eb6247e0c2 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -76,6 +76,9 @@ init_file(File, PrevHandler) ->
Error -> Error
end.
+%% filter out "application: foo; exited: stopped; type: temporary"
+handle_event({info_report, _, {_, std_info, _}}, State) ->
+ {ok, State};
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index c2c7d947e0..d050459138 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -22,7 +22,7 @@
assert_equivalence/6, assert_args_equivalence/2, check_type/1,
lookup/1, lookup_or_die/1, list/1, lookup_scratch/2, update_scratch/3,
info_keys/0, info/1, info/2, info_all/1, info_all/2,
- route/2, delete/2]).
+ route/2, delete/2, validate_binding/2]).
%% these must be run inside a mnesia tx
-export([maybe_auto_delete/1, serial/1, peek_serial/1, update/2]).
@@ -83,6 +83,9 @@
(name(), boolean())-> 'ok' |
rabbit_types:error('not_found') |
rabbit_types:error('in_use')).
+-spec(validate_binding/2 ::
+ (rabbit_types:exchange(), rabbit_types:binding())
+ -> rabbit_types:ok_or_error({'binding_invalid', string(), [any()]})).
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
@@ -121,7 +124,10 @@ callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
-policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
+policy_changed(X = #exchange{type = XType}, X1) ->
+ [ok = M:policy_changed(X, X1) ||
+ M <- [type_to_module(XType) | registry_lookup(exchange_decorator)]],
+ ok.
serialise_events(X = #exchange{type = Type}) ->
lists:any(fun (M) -> M:serialise_events(X) end,
@@ -399,6 +405,10 @@ delete(XName, IfUnused) ->
end
end).
+validate_binding(X = #exchange{type = XType}, Binding) ->
+ Module = type_to_module(XType),
+ Module:validate_binding(X, Binding).
+
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
maybe_auto_delete(#exchange{auto_delete = true} = X) ->
@@ -440,8 +450,7 @@ peek_serial(XName, LockType) ->
end.
invalid_module(T) ->
- rabbit_log:warning(
- "Could not find exchange type ~s.~n", [T]),
+ rabbit_log:warning("Could not find exchange type ~s.~n", [T]),
put({xtype_to_module, T}, rabbit_exchange_type_invalid),
rabbit_exchange_type_invalid.
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index 491c9d276a..bf4add7308 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -45,6 +45,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -53,17 +57,13 @@
-callback remove_bindings(serial(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok'.
-%% called when the policy attached to this exchange changes.
--callback policy_changed(
- serial(), rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok'.
-
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
[{description, 0}, {serialise_events, 1}, {create, 2}, {delete, 3},
- {add_binding, 3}, {remove_bindings, 3}, {policy_changed, 3}];
+ {policy_changed, 2}, {add_binding, 3}, {remove_bindings, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 1fbcb2d8e6..ebc59501ef 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -37,6 +37,10 @@
%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
-callback validate(rabbit_types:exchange()) -> 'ok'.
+%% called BEFORE declaration, to check args etc
+-callback validate_binding(rabbit_types:exchange(), rabbit_types:binding()) ->
+ rabbit_types:ok_or_error({'binding_invalid', string(), [any()]}).
+
%% called after declaration and recovery
-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
@@ -44,6 +48,10 @@
-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
'ok'.
+%% called when the policy attached to this exchange changes.
+-callback policy_changed(rabbit_types:exchange(), rabbit_types:exchange()) ->
+ 'ok'.
+
%% called after a binding has been added or recovered
-callback add_binding(serial(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok'.
@@ -58,18 +66,15 @@
rabbit_framing:amqp_table()) ->
'ok' | rabbit_types:connection_exit().
-%% called when the policy attached to this exchange changes.
--callback policy_changed(serial(), rabbit_types:exchange(),
- rabbit_types:exchange()) -> 'ok'.
-
-else.
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
+ [{description, 0}, {serialise_events, 0}, {route, 2},
+ {validate, 1}, {validate_binding, 2}, {policy_changed, 2},
{create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
- {assert_args_equivalence, 2}, {policy_changed, 3}];
+ {assert_args_equivalence, 2}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 213b24c445..10a79c5556 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -20,8 +20,9 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
@@ -40,9 +41,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 5b17ed5671..3ebd85485b 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -20,7 +20,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -39,9 +40,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 75899160fe..cf2d314079 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -50,14 +51,24 @@ route(#exchange{name = Name},
rabbit_router:match_bindings(
Name, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end).
-default_headers_match_kind() -> all.
+validate_binding(_X, #binding{args = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-match">>) of
+ {longstr, <<"all">>} -> ok;
+ {longstr, <<"any">>} -> ok;
+ {longstr, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field value ~p; "
+ "expected all or any", [Other]}};
+ {Type, Other} -> {error,
+ {binding_invalid,
+ "Invalid x-match field type ~p (value ~p); "
+ "expected longstr", [Type, Other]}};
+ undefined -> {error,
+ {binding_invalid, "x-match field missing", []}}
+ end.
parse_x_match(<<"all">>) -> all;
-parse_x_match(<<"any">>) -> any;
-parse_x_match(Other) ->
- rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
- [Other]),
- default_headers_match_kind().
+parse_x_match(<<"any">>) -> any.
%% Horrendous matching algorithm. Depends for its merge-like
%% (linear-time) behaviour on the lists:keysort
@@ -68,17 +79,9 @@ parse_x_match(Other) ->
%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
%%
-headers_match(Pattern, Data) ->
- MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
- {value, {_, longstr, MK}} -> parse_x_match(MK);
- {value, {_, Type, MK}} ->
- rabbit_log:warning("Invalid x-match field type ~p "
- "(value ~p); expected longstr",
- [Type, MK]),
- default_headers_match_kind();
- _ -> default_headers_match_kind()
- end,
- headers_match(Pattern, Data, true, false, MatchKind).
+headers_match(Args, Data) ->
+ {longstr, MK} = rabbit_misc:table_lookup(Args, <<"x-match">>),
+ headers_match(Args, Data, true, false, parse_x_match(MK)).
headers_match([], _Data, AllMatch, _AnyMatch, all) ->
AllMatch;
@@ -115,7 +118,7 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 6b07351a47..07a8004aab 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -20,8 +20,9 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
description() ->
[{description,
@@ -41,9 +42,10 @@ route(#exchange{name = Name, type = Type}, _) ->
[rabbit_misc:rs(Name), Type]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(_Tx, _X, _Bs) -> ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index bd8ad1acc7..ce76ccb0dd 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -21,7 +21,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, serialise_events/0, route/2]).
--export([validate/1, create/2, delete/3, policy_changed/3, add_binding/3,
+-export([validate/1, validate_binding/2,
+ create/2, delete/3, policy_changed/2, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
-rabbit_boot_step({?MODULE,
@@ -47,6 +48,7 @@ route(#exchange{name = X},
end || RKey <- Routes]).
validate(_X) -> ok.
+validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
@@ -57,7 +59,7 @@ delete(transaction, #exchange{name = X}, _Bs) ->
delete(none, _Exchange, _Bs) ->
ok.
-policy_changed(_Tx, _X1, _X2) -> ok.
+policy_changed(_X1, _X2) -> ok.
add_binding(transaction, _Exchange, Binding) ->
internal_add_binding(Binding);
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 7411b3d6dc..de53b7f0b3 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -249,7 +249,8 @@ handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
- {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+ {noreply, handle_dead_rabbit_state(
+ State#state{monitors = pmon:erase({rabbit, Node}, Monitors)})};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
@@ -257,10 +258,19 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
- State = #state{partitions = Partitions}) ->
+ State = #state{partitions = Partitions,
+ monitors = Monitors}) ->
+ %% We will not get a node_up from this node - yet we should treat it as
+ %% up (mostly).
+ State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true -> State;
+ false -> State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ end,
+ ok = handle_live_rabbit(Node),
Partitions1 = ordsets:to_list(
ordsets:add_element(Node, ordsets:from_list(Partitions))),
- {noreply, State#state{partitions = Partitions1}};
+ {noreply, State1#state{partitions = Partitions1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -282,7 +292,65 @@ handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
ok = rabbit_alarm:on_node_down(Node),
- ok = rabbit_mnesia:on_node_down(Node).
+ ok = rabbit_mnesia:on_node_down(Node),
+ case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, pause_minority} ->
+ case majority() of
+ true -> ok;
+ false -> await_cluster_recovery()
+ end;
+ {ok, ignore} ->
+ ok;
+ {ok, Term} ->
+ rabbit_log:warning("cluster_partition_handling ~p unrecognised, "
+ "assuming 'ignore'~n", [Term]),
+ ok
+ end,
+ ok.
+
+majority() ->
+ length(alive_nodes()) / length(rabbit_mnesia:cluster_nodes(all)) > 0.5.
+
+%% mnesia:system_info(db_nodes) (and hence
+%% rabbit_mnesia:cluster_nodes(running)) does not give reliable results
+%% when partitioned.
+alive_nodes() ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ [N || N <- Nodes, pong =:= net_adm:ping(N)].
+
+await_cluster_recovery() ->
+ rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
+ []),
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ spawn(fun () ->
+ %% If our group leader is inside an application we are about
+ %% to stop, application:stop/1 does not return.
+ group_leader(whereis(init), self()),
+ %% Ensure only one restarting process at a time, will
+ %% exit(badarg) (harmlessly) if one is already running
+ register(rabbit_restarting_process, self()),
+ rabbit:stop(),
+ wait_for_cluster_recovery(Nodes)
+ end).
+
+wait_for_cluster_recovery(Nodes) ->
+ case majority() of
+ true -> rabbit:start();
+ false -> timer:sleep(1000),
+ wait_for_cluster_recovery(Nodes)
+ end.
+
+handle_dead_rabbit_state(State = #state{partitions = Partitions}) ->
+ %% If we have been partitioned, and we are now in the only remaining
+ %% partition, we no longer care about partitions - forget them. Note
+ %% that we do not attempt to deal with individual (other) partitions
+ %% going away. It's only safe to forget anything about partitions when
+ %% there are no partitions.
+ Partitions1 = case Partitions -- (Partitions -- alive_nodes()) of
+ [] -> [];
+ _ -> Partitions
+ end,
+ State#state{partitions = Partitions1}.
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 27807b6236..1188c5549a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1094,6 +1094,7 @@ test_policy_validation() ->
{error_string, _} = SetPol("testpos", [-1, 0, 1]),
{error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+ ok = control_action(clear_policy, ["name"]),
rabbit_runtime_parameters_test:unregister_policy_validator(),
passed.