summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-29 17:36:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-10-29 17:36:07 +0000
commitf389dbe03d78bde9dbb5cfe2b9450d907edd7cf6 (patch)
tree5999cf8316692f243f6292cf87a3114a2094fa06 /src
parentf774e7ba2fd8e478ddacc11729175f64344a71cf (diff)
parent0609c5bfa368cae3724feac2da26b9436e6e5d36 (diff)
downloadrabbitmq-server-git-f389dbe03d78bde9dbb5cfe2b9450d907edd7cf6.tar.gz
Merge default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl3
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl60
-rw-r--r--src/rabbit_backing_queue.erl26
-rw-r--r--src/rabbit_backing_queue_qc.erl9
-rw-r--r--src/rabbit_basic.erl48
-rw-r--r--src/rabbit_binary_generator.erl29
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_control_main.erl30
-rw-r--r--src/rabbit_guid.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl65
-rw-r--r--src/rabbit_mirror_queue_misc.erl66
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl163
-rw-r--r--src/rabbit_node_monitor.erl41
-rw-r--r--src/rabbit_plugins.erl9
-rw-r--r--src/rabbit_plugins_main.erl60
-rw-r--r--src/rabbit_policy.erl136
-rw-r--r--src/rabbit_policy_validator.erl37
-rw-r--r--src/rabbit_reader.erl48
-rw-r--r--src/rabbit_registry.erl3
-rw-r--r--src/rabbit_runtime_parameters.erl121
-rw-r--r--src/rabbit_runtime_parameters_test.erl30
-rw-r--r--src/rabbit_tests.erl99
-rw-r--r--src/rabbit_variable_queue.erl28
-rw-r--r--src/rabbit_vhost.erl2
27 files changed, 797 insertions, 352 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index ae73ebc805..c52c296ad3 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -36,7 +36,7 @@
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
{mfa, {rabbit_binary_generator,
- check_empty_content_body_frame_size,
+ check_empty_frame_size,
[]}},
{requires, pre_boot},
{enables, external_infrastructure}]}).
@@ -303,7 +303,6 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- exit(bang),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
ok = print_plugin_info(rabbit_plugins:active())
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6ad85b24f5..87e44aa30c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,7 +18,7 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+-export([lookup/1, with/2, with/3, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -81,6 +81,7 @@
rabbit_types:error('not_found');
([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
+-spec(with/3 :: (name(), qfun(A), fun(() -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
-spec(assert_equivalence/5 ::
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7ce6958223..ec5ef3f80b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -513,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
+discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
+ State) ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([MsgId], State),
+ BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ State1#q{backing_queue_state = BQS1}.
+
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
@@ -521,17 +529,20 @@ run_message_queue(State) ->
BQ:is_empty(BQS), State1),
State2.
-attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
+attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+ Props = #message_properties{delivered = Delivered},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
- fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
+ fun (true, State1 = #q{backing_queue_state = BQS2}) ->
{AckTag, BQS3} = BQ:publish_delivered(
- AckRequired, Message, Props,
- SenderPid, BQS2),
- {{Message, Props#message_properties.delivered, AckTag},
- true, State1#q{backing_queue_state = BQS3}}
+ Message, Props, SenderPid, BQS2),
+ {{Message, Delivered, AckTag},
+ true, State1#q{backing_queue_state = BQS3}};
+ (false, State1) ->
+ {{Message, Delivered, undefined},
+ true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
{published, BQS1} ->
{true, State#q{backing_queue_state = BQS1}};
@@ -548,11 +559,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
- State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([Message#basic_message.id], State2),
- BQS1 = BQ:discard(Message, SenderPid, BQS),
- State3#q{backing_queue_state = BQS1};
+ discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
@@ -688,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} =
- case DLXFun of
- undefined ->
- {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ ->
- {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun(Msgs),
- {Next, BQS2}
- end,
+ {Props, BQS1} = case DLXFun of
+ undefined -> {Next, undefined, BQS2} =
+ BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ -> {Next, Msgs, BQS2} =
+ BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun(Msgs),
+ {Next, BQS2}
+ end,
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
@@ -847,8 +853,8 @@ make_dead_letter_msg(Reason,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}],
- HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
- Info, Headers))
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
end,
Content1 = rabbit_basic:map_headers(HeadersFun2, Content),
Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(),
@@ -1303,11 +1309,11 @@ handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
- %% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer),
- assert_invariant(State1),
- {noreply, State1, hibernate};
+ {noreply, State1, Timeout} = noreply(State),
+ %% Need to reset *after* we've been through noreply/1 so we do not
+ %% just create another timer always and therefore never hibernate
+ {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c6d1778532..af660c60a0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -84,12 +84,16 @@
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
--callback publish_delivered(true, rabbit_types:basic_message(),
+-callback publish_delivered(rabbit_types:basic_message(),
rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}.
+ -> {ack(), state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -200,13 +204,6 @@
-callback is_duplicate(rabbit_types:basic_message(), state())
-> {'false'|'published'|'discarded', state()}.
-%% Called to inform the BQ about messages which have reached the
-%% queue, but are not going to be further passed to BQ for some
-%% reason. Note that this is may be invoked for messages for which
-%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
-%% BQS}.
--callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
-
-else.
-export([behaviour_info/1]).
@@ -214,12 +211,11 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
- {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
- {discard, 3}];
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index d658f0f0f4..b37fbb29e2 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -119,7 +119,7 @@ qc_publish_multiple(#state{}) ->
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+ [qc_message(), #message_properties{}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
@@ -199,7 +199,7 @@ next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) ->
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
- [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ [Msg, MsgProps, _Pid, _BQ]}) ->
#state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
@@ -213,10 +213,7 @@ next_state(S, Res,
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
- acks = case AckReq of
- true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
- false -> Acks
- end
+ acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index db2b7e9570..9966c0df31 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([publish/4, publish/5, publish/1,
- message/3, message/4, properties/1, append_table_header/3,
+ message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
-export([build_content/2, from_content/1]).
@@ -58,7 +58,7 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(append_table_header/3 ::
+-spec(prepend_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
@@ -177,15 +177,45 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
-append_table_header(Name, Info, undefined) ->
- append_table_header(Name, Info, []);
-append_table_header(Name, Info, Headers) ->
- Prior = case rabbit_misc:table_lookup(Headers, Name) of
- undefined -> [];
- {array, Existing} -> Existing
- end,
+prepend_table_header(Name, Info, undefined) ->
+ prepend_table_header(Name, Info, []);
+prepend_table_header(Name, Info, Headers) ->
+ case rabbit_misc:table_lookup(Headers, Name) of
+ {array, Existing} ->
+ prepend_table(Name, Info, Existing, Headers);
+ undefined ->
+ prepend_table(Name, Info, [], Headers);
+ Other ->
+ Headers2 = prepend_table(Name, Info, [], Headers),
+ set_invalid_header(Name, Other, Headers2)
+ end.
+
+prepend_table(Name, Info, Prior, Headers) ->
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
+ case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
+ undefined ->
+ set_invalid([{Name, array, [Value]}], Headers);
+ {table, ExistingHdr} ->
+ update_invalid(Name, Value, ExistingHdr, Headers);
+ Other ->
+ %% somehow the x-invalid-headers header is corrupt
+ Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
+ set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
+ end.
+
+set_invalid(NewHdr, Headers) ->
+ rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
+
+update_invalid(Name, Value, ExistingHdr, Header) ->
+ Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
+ undefined -> [Value];
+ {array, Prior} -> [Value | Prior]
+ end,
+ NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
+ set_invalid(NewHdr, Header).
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index d69376fb75..4700fa3170 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,20 +18,11 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
-%% - 1 byte of frame type
-%% - 2 bytes of channel number
-%% - 4 bytes of frame payload length
-%% - 1 byte of payload trailer FRAME_END byte
-%% See definition of check_empty_content_body_frame_size/0,
-%% an assertion called at startup.
--define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-
-export([build_simple_method_frame/3,
build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
--export([check_empty_content_body_frame_size/0]).
+-export([check_empty_frame_size/0]).
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
@@ -53,7 +44,7 @@
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
-spec(encode_properties/2 ::
([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
--spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
+-spec(check_empty_frame_size/0 :: () -> 'ok').
-spec(ensure_content_encoded/2 ::
(rabbit_types:content(), rabbit_types:protocol()) ->
rabbit_types:encoded_content()).
@@ -88,10 +79,8 @@ build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
[HeaderFrame | ContentFrames].
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if FrameMax == 0 ->
- iolist_size(FragsRev);
- true ->
- FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
+ BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev);
+ true -> FrameMax - ?EMPTY_FRAME_SIZE
end,
build_content_frames(0, [], BodyPayloadMax, [],
lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
@@ -257,15 +246,13 @@ encode_property(timestamp, Int) ->
encode_property(table, Table) ->
table_to_binary(Table).
-check_empty_content_body_frame_size() ->
- %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is
- %% defined correctly.
+check_empty_frame_size() ->
+ %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)),
- if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE ->
+ if ComputedSize == ?EMPTY_FRAME_SIZE ->
ok;
true ->
- exit({incorrect_empty_content_body_frame_size,
- ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
+ exit({incorrect_empty_frame_size, ComputedSize, ?EMPTY_FRAME_SIZE})
end.
ensure_content_encoded(Content = #content{properties_bin = PropBin,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0d13312b0b..54427206c5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1153,10 +1153,6 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- %% FIXME: connection exception (!) on failure??
- %% (see rule named "failure" in spec-XML)
- %% FIXME: don't allow binding to internal exchanges -
- %% including the one named "" !
{DestinationName, ActualRoutingKey} =
expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
check_write_permitted(DestinationName, State),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index a3cbf6e53c..25f7d758c6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -70,6 +70,10 @@
{clear_parameter, [?VHOST_DEF]},
{list_parameters, [?VHOST_DEF]},
+ {set_policy, [?VHOST_DEF]},
+ {clear_policy, [?VHOST_DEF]},
+ {list_policies, [?VHOST_DEF]},
+
{list_queues, [?VHOST_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
@@ -98,7 +102,9 @@
{"Bindings", rabbit_binding, info_all, info_keys},
{"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys},
{"Permissions", rabbit_auth_backend_internal, list_vhost_permissions,
- vhost_perms_info_keys}]).
+ vhost_perms_info_keys},
+ {"Policies", rabbit_policy, list_formatted, info_keys},
+ {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]).
%%----------------------------------------------------------------------------
@@ -458,6 +464,28 @@ action(list_parameters, Node, [], Opts, Inform) ->
rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
rabbit_runtime_parameters:info_keys());
+action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform)
+ when Prio == [] orelse length(Prio) == 1 ->
+ Msg = "Setting policy ~p for pattern ~p to ~p",
+ {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined};
+ [P] -> {Msg ++ " with priority ~s", P}
+ end,
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform(InformMsg, [Key, Pattern, Defn] ++ Prio),
+ rpc_call(Node, rabbit_policy, parse_set,
+ [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]);
+
+action(clear_policy, Node, [Key], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Clearing policy ~p", [Key]),
+ rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
+
+action(list_policies, Node, [], Opts, Inform) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
+ rabbit_policy:info_keys());
+
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index ba0cb04f71..cedbbdb380 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -144,11 +144,7 @@ gen_secure() ->
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
string(G, Prefix) ->
- Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
- ($\/, Acc) -> [$\_ | Acc];
- ($\=, Acc) -> Acc;
- (Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(G)).
+ Prefix ++ "-" ++ rabbit_misc:base64url(G).
binary(G, Prefix) ->
list_to_binary(string(G, Prefix)).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 377d51868d..6a7a28f291 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,11 +17,11 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
+ status/1, invoke/3, is_duplicate/2, fold/3]).
-export([start/1, stop/0]).
@@ -88,12 +88,10 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
+init(Q, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State.
@@ -109,6 +107,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
ok = rabbit_amqqueue:store_queue(
Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -183,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
-publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(
- GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
- {AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {AckTag,
- ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 })}.
+ State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ {AckTag, ensure_monitoring(ChPid, State1)}.
+
+discard(MsgId, ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQS1,
+ seen_status = dict:erase(MsgId, SS) });
+ {ok, discarded} ->
+ State
+ end.
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
@@ -375,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId },
{discarded, State}
end.
-discard(Msg = #basic_message { id = MsgId }, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
-
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4c8406d9cf..ef0a374d2e 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -15,16 +15,26 @@
%%
-module(rabbit_mirror_queue_misc).
+-behaviour(rabbit_policy_validator).
-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
+-rabbit_boot_step({?MODULE,
+ [{description, "HA policy validation"},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-mode">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-params">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -127,7 +137,7 @@ on_node_up() ->
ok.
drop_mirrors(QName, Nodes) ->
- [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes],
ok.
drop_mirror(QName, MirrorNode) ->
@@ -144,7 +154,7 @@ drop_mirror(QName, MirrorNode) ->
"Dropping queue mirror on node ~p for ~s~n",
[MirrorNode, rabbit_misc:rs(Name)]),
exit(Pid, {shutdown, dropped}),
- ok
+ {ok, dropped}
end
end).
@@ -202,7 +212,8 @@ if_mirrored_queue(QName, Fun) ->
false -> ok;
true -> Fun(Q)
end
- end).
+ end,
+ rabbit_misc:const({ok, not_found})).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
@@ -258,7 +269,11 @@ policy(Policy, Q) ->
suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
{MNode, Possible -- [MNode]};
suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
- Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ %% If the current master is currently not in the nodes specified,
+ %% act like it is for the purposes below - otherwise we will not
+ %% return it in the results...
+ Nodes = lists:usort([MNode | Nodes1]),
Unavailable = Nodes -- Possible,
Available = Nodes -- Unavailable,
case Available of
@@ -304,19 +319,12 @@ is_mirrored(Q) ->
_ -> false
end.
-
-%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to
-%% master and start any needed slaves. However, if node(QPid) is not
-%% in the nodes for the policy, it won't switch it. So this is for the
-%% case where we kill the existing queue and restart elsewhere. TODO:
-%% is this TRTTD? All alternatives seem ugly.
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
{true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid),
- update_mirrors0(OldQ, NewQ); %% [1]
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
{true, true} -> update_mirrors0(OldQ, NewQ)
end.
@@ -328,3 +336,35 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
add_mirrors(QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy(KeyList) ->
+ validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)).
+
+validate_policy(<<"all">>, none) ->
+ ok;
+validate_policy(<<"all">>, _Params) ->
+ {error, "ha-mode=\"all\" does not take parameters", []};
+
+validate_policy(<<"nodes">>, []) ->
+ {error, "ha-mode=\"nodes\" list must be non-empty", []};
+validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) ->
+ case [I || I <- Nodes, not is_binary(I)] of
+ [] -> ok;
+ Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
+ "~p was not a string", [Invalid]}
+ end;
+validate_policy(<<"nodes">>, Params) ->
+ {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]};
+
+validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 ->
+ ok;
+validate_policy(<<"exactly">>, Params) ->
+ {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]};
+
+validate_policy(Mode, _Params) ->
+ {error, "~p is not a valid ha-mode value", [Mode]}.
+
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d0efe37adc..1ba1420f42 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -697,25 +697,23 @@ publish_or_discard(Status, ChPid, MsgId,
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
-process_instruction({publish, false, ChPid, MsgProps,
+process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
-process_instruction({publish, {true, AckRequired}, ChPid, MsgProps,
+process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- {ok, maybe_store_ack(AckRequired, MsgId, AckTag,
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
-process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State) ->
+process_instruction({discard, ChPid, MsgId}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
- BQS1 = BQ:discard(Msg, ChPid, BQS),
+ BQS1 = BQ:discard(MsgId, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a0536a50a9..ab9a9cebd4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -63,6 +63,7 @@
-export([version/0]).
-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
+-export([base64url/1]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -227,6 +228,7 @@
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
-spec(term_to_json/1 :: (any()) -> any()).
+-spec(base64url/1 :: (binary()) -> string()).
-endif.
@@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) ->
term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
+
+base64url(In) ->
+ lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
+ ($\/, Acc) -> [$\_ | Acc];
+ ($\=, Acc) -> Acc;
+ (Chr, Acc) -> [Chr | Acc]
+ end, [], base64:encode_to_string(In))).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 870692282f..942048f9b0 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -108,8 +108,24 @@ init() ->
ok.
init_from_config() ->
- {ok, {TryNodes, NodeType}} =
- application:get_env(rabbit, cluster_nodes),
+ {TryNodes, NodeType} =
+ case application:get_env(rabbit, cluster_nodes) of
+ {ok, Nodes} when is_list(Nodes) ->
+ Config = {Nodes -- [node()], case lists:member(node(), Nodes) of
+ true -> disc;
+ false -> ram
+ end},
+ error_logger:warning_msg(
+ "Converting legacy 'cluster_nodes' configuration~n ~w~n"
+ "to~n ~w.~n~n"
+ "Please update the configuration to the new format "
+ "{Nodes, NodeType}, where Nodes contains the nodes that the "
+ "node will try to cluster with, and NodeType is either "
+ "'disc' or 'ram'~n", [Nodes, Config]),
+ Config;
+ {ok, Config} ->
+ Config
+ end,
case find_good_node(nodes_excl_me(TryNodes)) of
{ok, Node} ->
rabbit_log:info("Node '~p' selected for clustering from "
@@ -158,7 +174,7 @@ join_cluster(DiscoveryNode, NodeType) ->
%% this case - we're joining a new cluster with new nodes which
%% are not in synch with the current node. I also lifts the burden
%% of reseting the node from the user.
- reset(false),
+ reset_gracefully(),
%% Join the cluster
rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n",
@@ -172,39 +188,35 @@ join_cluster(DiscoveryNode, NodeType) ->
%% cluster, has no cluster configuration, no local database, and no
%% persisted messages
reset() ->
+ ensure_mnesia_not_running(),
rabbit_misc:local_info_msg("Resetting Rabbit~n", []),
- reset(false).
+ reset_gracefully().
force_reset() ->
+ ensure_mnesia_not_running(),
rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []),
- reset(true).
+ wipe().
+
+reset_gracefully() ->
+ AllNodes = cluster_nodes(all),
+ %% Reconnecting so that we will get an up to date nodes. We don't
+ %% need to check for consistency because we are resetting.
+ %% Force=true here so that reset still works when clustered with a
+ %% node which is down.
+ init_db_with_mnesia(AllNodes, node_type(), false, false),
+ case is_only_clustered_disc_node() of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema),
+ wipe().
-reset(Force) ->
- ensure_mnesia_not_running(),
- Nodes = case Force of
- true ->
- nodes();
- false ->
- AllNodes = cluster_nodes(all),
- %% Reconnecting so that we will get an up to date
- %% nodes. We don't need to check for consistency
- %% because we are resetting. Force=true here so
- %% that reset still works when clustered with a
- %% node which is down.
- init_db_with_mnesia(AllNodes, node_type(), false, false),
- case is_only_clustered_disc_node() of
- true -> e(resetting_only_disc_node);
- false -> ok
- end,
- leave_cluster(),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema),
- cluster_nodes(all)
- end,
+wipe() ->
%% We need to make sure that we don't end up in a distributed
%% Erlang system with nodes while not being in an Mnesia cluster
%% with them. We don't handle that well.
- [erlang:disconnect_node(N) || N <- Nodes],
+ [erlang:disconnect_node(N) || N <- cluster_nodes(all)],
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok = rabbit_node_monitor:reset_cluster_status(),
@@ -221,7 +233,9 @@ change_cluster_node_type(Type) ->
{ok, Status} -> Status;
{error, _Reason} -> e(cannot_connect_to_cluster)
end,
- Node = case RunningNodes of
+ %% We might still be marked as running by a remote node since the
+ %% information of us going down might not have propagated yet.
+ Node = case RunningNodes -- [node()] of
[] -> e(no_online_cluster_nodes);
[Node0|_] -> Node0
end,
@@ -276,18 +290,18 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
end.
remove_node_offline_node(Node) ->
- %% We want the running nodes *now*, so we don't call
- %% `cluster_nodes(running)' which will just get what's in the cluster status
- %% file.
- case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of
+ %% Here `mnesia:system_info(running_db_nodes)' will RPC, but that's what we
+ %% want - we need to know the running nodes *now*. If the current node is a
+ %% RAM node it will return bogus results, but we don't care since we only do
+ %% this operation from disc nodes.
+ case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of
{[], disc} ->
- %% Note that while we check if the nodes was the last to
- %% go down, apart from the node we're removing from, this
- %% is still unsafe. Consider the situation in which A and
- %% B are clustered. A goes down, and records B as the
- %% running node. Then B gets clustered with C, C goes down
- %% and B goes down. In this case, C is the second-to-last,
- %% but we don't know that and we'll remove B from A
+ %% Note that while we check if the nodes was the last to go down,
+ %% apart from the node we're removing from, this is still unsafe.
+ %% Consider the situation in which A and B are clustered. A goes
+ %% down, and records B as the running node. Then B gets clustered
+ %% with C, C goes down and B goes down. In this case, C is the
+ %% second-to-last, but we don't know that and we'll remove B from A
%% anyway, even if that will lead to bad things.
case cluster_nodes(running) -- [node(), Node] of
[] -> start_mnesia(),
@@ -320,10 +334,17 @@ status() ->
[{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++
IfNonEmpty(ram, cluster_nodes(ram)))}] ++
case mnesia:system_info(is_running) of
- yes -> [{running_nodes, cluster_nodes(running)}];
+ yes -> RunningNodes = cluster_nodes(running),
+ [{running_nodes, cluster_nodes(running)},
+ {partitions, mnesia_partitions(RunningNodes)}];
no -> []
end.
+mnesia_partitions(Nodes) ->
+ {Replies, _BadNodes} = rpc:multicall(
+ Nodes, rabbit_node_monitor, partitions, []),
+ [Reply || Reply = {_, R} <- Replies, R =/= []].
+
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
@@ -332,7 +353,7 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
%% the data from mnesia. Obviously it'll work only when mnesia is
%% running.
-mnesia_nodes() ->
+cluster_status_from_mnesia() ->
case mnesia:system_info(is_running) of
no ->
{error, mnesia_not_running};
@@ -352,39 +373,33 @@ mnesia_nodes() ->
disc -> nodes_incl_me(DiscCopies);
ram -> DiscCopies
end,
- {ok, {AllNodes, DiscNodes}};
+ %% `mnesia:system_info(running_db_nodes)' is safe since
+ %% we know that mnesia is running
+ RunningNodes = mnesia:system_info(running_db_nodes),
+ {ok, {AllNodes, DiscNodes, RunningNodes}};
false -> {error, tables_not_present}
end
end.
cluster_status(WhichNodes) ->
- %% I don't want to call `running_nodes/1' unless if necessary, since it's
- %% pretty expensive.
- {AllNodes1, DiscNodes1, RunningNodesThunk} =
- case mnesia_nodes() of
- {ok, {AllNodes, DiscNodes}} ->
- {AllNodes, DiscNodes, fun() -> running_nodes(AllNodes) end};
+ {AllNodes, DiscNodes, RunningNodes} = Nodes =
+ case cluster_status_from_mnesia() of
+ {ok, Nodes0} ->
+ Nodes0;
{error, _Reason} ->
- {AllNodes, DiscNodes, RunningNodes} =
+ {AllNodes0, DiscNodes0, RunningNodes0} =
rabbit_node_monitor:read_cluster_status(),
%% The cluster status file records the status when the node is
%% online, but we know for sure that the node is offline now, so
%% we can remove it from the list of running nodes.
- {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end}
+ {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)}
end,
case WhichNodes of
- status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
- all -> AllNodes1;
- disc -> DiscNodes1;
- ram -> AllNodes1 -- DiscNodes1;
- running -> RunningNodesThunk()
- end.
-
-cluster_status_from_mnesia() ->
- case mnesia_nodes() of
- {ok, {AllNodes, DiscNodes}} -> {ok, {AllNodes, DiscNodes,
- running_nodes(AllNodes)}};
- {error, _} = Err -> Err
+ status -> Nodes;
+ all -> AllNodes;
+ disc -> DiscNodes;
+ ram -> AllNodes -- DiscNodes;
+ running -> RunningNodes
end.
node_info() ->
@@ -719,14 +734,6 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) ->
Nodes
end.
-%% We're not using `mnesia:system_info(running_db_nodes)' directly
-%% because if the node is a RAM node it won't know about other nodes
-%% when mnesia is stopped
-running_nodes(Nodes) ->
- {Replies, _BadNodes} = rpc:multicall(Nodes,
- rabbit_mnesia, is_running_remote, []),
- [Node || {Running, Node} <- Replies, Running].
-
is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}.
check_consistency(OTP, Rabbit) ->
@@ -810,21 +817,23 @@ e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
error_description(clustering_only_disc_node) ->
"You cannot cluster a node if it is the only disc node in its existing "
" cluster. If new nodes joined while this node was offline, use "
- "\"update_cluster_nodes\" to add them manually.";
+ "'update_cluster_nodes' to add them manually.";
error_description(resetting_only_disc_node) ->
"You cannot reset a node when it is the only disc node in a cluster. "
"Please convert another node of the cluster to a disc node first.";
error_description(already_clustered) ->
- "You are already clustered with the nodes you have selected.";
+ "You are already clustered with the nodes you have selected. If the "
+ "node you are trying to cluster with is not present in the current "
+ "node status, use 'update_cluster_nodes'.";
error_description(not_clustered) ->
"Non-clustered nodes can only be disc nodes.";
error_description(cannot_connect_to_cluster) ->
"Could not connect to the cluster nodes present in this node's "
"status file. If the cluster has changed, you can use the "
- "\"update_cluster_nodes\" command to point to the new cluster nodes.";
+ "'update_cluster_nodes' command to point to the new cluster nodes.";
error_description(no_online_cluster_nodes) ->
"Could not find any online cluster nodes. If the cluster has changed, "
- "you can use the 'recluster' command.";
+ "you can use the 'update_cluster_nodes' command.";
error_description(cannot_connect_to_node) ->
"Could not connect to the cluster node provided.";
error_description(inconsistent_cluster) ->
@@ -839,11 +848,11 @@ error_description(offline_node_no_offline_flag) ->
"but can be done with the --offline flag. Please consult the manual "
"for rabbitmqctl for more information.";
error_description(not_last_node_to_go_down) ->
- "The node you're trying to remove from was not the last to go down "
+ "The node you are trying to remove from was not the last to go down "
"(excluding the node you are removing). Please use the the last node "
"to go down to remove nodes when the cluster is offline.";
error_description(removing_node_from_offline_node) ->
- "To remove a node remotely from an offline node, the node you're removing "
+ "To remove a node remotely from an offline node, the node you are removing "
"from must be a disc node and all the other nodes must be offline.";
error_description(no_running_cluster_nodes) ->
"You cannot leave a cluster if no online nodes are present.".
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 026aa3624e..b11c9d049a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -24,6 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
+-export([partitions/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -32,6 +33,8 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+-record(state, {monitors, partitions}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -50,6 +53,8 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+
-endif.
%%----------------------------------------------------------------------------
@@ -168,10 +173,23 @@ notify_left_cluster(Node) ->
ok.
%%----------------------------------------------------------------------------
+%% Server calls
+%%----------------------------------------------------------------------------
+
+partitions() ->
+ gen_server:call(?SERVER, partitions, infinity).
+
+%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, pmon:new()}.
+init([]) ->
+ {ok, _} = mnesia:subscribe(system),
+ {ok, #state{monitors = pmon:new(),
+ partitions = []}}.
+
+handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
+ {reply, {node(), Partitions}, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -179,9 +197,10 @@ handle_call(_Request, _From, State) ->
%% Note: when updating the status file, we can't simply write the
%% mnesia information since the message can (and will) overtake the
%% mnesia propagation.
-handle_cast({node_up, Node, NodeType}, Monitors) ->
+handle_cast({node_up, Node, NodeType},
+ State = #state{monitors = Monitors}) ->
case pmon:is_monitored({rabbit, Node}, Monitors) of
- true -> {noreply, Monitors};
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
@@ -191,7 +210,8 @@ handle_cast({node_up, Node, NodeType}, Monitors) ->
end,
add_node(Node, RunningNodes)}),
ok = handle_live_rabbit(Node),
- {noreply, pmon:monitor({rabbit, Node}, Monitors)}
+ {noreply, State#state{
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}}
end;
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
@@ -210,12 +230,21 @@ handle_cast({left_cluster, Node}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
+ State = #state{monitors = Monitors}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, pmon:erase({rabbit, Node}, Monitors)};
+ {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};
+
+handle_info({mnesia_system_event,
+ {inconsistent_database, running_partitioned_network, Node}},
+ State = #state{partitions = Partitions}) ->
+ Partitions1 = ordsets:to_list(
+ ordsets:add_element(Node, ordsets:from_list(Partitions))),
+ {noreply, State#state{partitions = Partitions1}};
+
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index ecb1961126..abe9b08968 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -100,8 +100,13 @@ dependencies(Reverse, Sources, AllPlugins) ->
{ok, G} = rabbit_misc:build_acyclic_graph(
fun (App, _Deps) -> [{App, App}] end,
fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end,
- [{Name, Deps}
- || #plugin{name = Name, dependencies = Deps} <- AllPlugins]),
+ lists:ukeysort(
+ 1, [{Name, Deps} ||
+ #plugin{name = Name,
+ dependencies = Deps} <- AllPlugins] ++
+ [{Dep, []} ||
+ #plugin{dependencies = Deps} <- AllPlugins,
+ Dep <- Deps])),
Dests = case Reverse of
false -> digraph_utils:reachable(Sources, G);
true -> digraph_utils:reaching(Sources, G)
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 572cf15019..2158d1da15 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -108,16 +108,19 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
Enabled, AllPlugins),
ToEnable = [list_to_atom(Name) || Name <- ToEnable0],
Missing = ToEnable -- plugin_names(AllPlugins),
- case Missing of
- [] -> ok;
- _ -> throw({error_string,
- fmt_list("The following plugins could not be found:",
- Missing)})
- end,
NewEnabled = lists:usort(Enabled ++ ToEnable),
- write_enabled_plugins(PluginsFile, NewEnabled),
NewImplicitlyEnabled = rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
+ MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing,
+ case {Missing, MissingDeps} of
+ {[], []} -> ok;
+ {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)});
+ {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)});
+ {_, _} -> throw({error_string,
+ fmt_missing("plugins", Missing) ++
+ fmt_missing("dependencies", MissingDeps)})
+ end,
+ write_enabled_plugins(PluginsFile, NewEnabled),
maybe_warn_mochiweb(NewImplicitlyEnabled),
case NewEnabled -- ImplicitlyEnabled of
[] -> io:format("Plugin configuration unchanged.~n");
@@ -183,9 +186,12 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
EnabledImplicitly =
rabbit_plugins:dependencies(false, EnabledExplicitly,
AvailablePlugins) -- EnabledExplicitly,
+ Missing = [#plugin{name = Name, dependencies = []} ||
+ Name <- ((EnabledExplicitly ++ EnabledImplicitly) --
+ plugin_names(AvailablePlugins))],
{ok, RE} = re:compile(Pattern),
Plugins = [ Plugin ||
- Plugin = #plugin{name = Name} <- AvailablePlugins,
+ Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
OnlyEnabledAll -> (lists:member(Name,
@@ -196,30 +202,35 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
#plugin{name = Name} <- Plugins1] ++ [0]),
- [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format,
- MaxWidth) || P <- Plugins1],
+ [format_plugin(P, EnabledExplicitly, EnabledImplicitly,
+ plugin_names(Missing), Format, MaxWidth) || P <- Plugins1],
ok.
format_plugin(#plugin{name = Name, version = Version,
description = Description, dependencies = Deps},
- EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) ->
+ EnabledExplicitly, EnabledImplicitly, Missing,
+ Format, MaxWidth) ->
Glyph = case {lists:member(Name, EnabledExplicitly),
- lists:member(Name, EnabledImplicitly)} of
- {true, false} -> "[E]";
- {false, true} -> "[e]";
- _ -> "[ ]"
+ lists:member(Name, EnabledImplicitly),
+ lists:member(Name, Missing)} of
+ {true, false, false} -> "[E]";
+ {false, true, false} -> "[e]";
+ {_, _, true} -> "[!]";
+ _ -> "[ ]"
end,
+ Opt = fun (_F, A, A) -> ok;
+ ( F, A, _) -> io:format(F, [A])
+ end,
case Format of
minimal -> io:format("~s~n", [Name]);
- normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++
- "w ~s~n", [Glyph, Name, Version]);
+ normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ "w ",
+ [Glyph, Name]),
+ Opt("~s", Version, undefined),
+ io:format("~n");
verbose -> io:format("~s ~w~n", [Glyph, Name]),
- io:format(" Version: \t~s~n", [Version]),
- case Deps of
- [] -> ok;
- _ -> io:format(" Dependencies:\t~p~n", [Deps])
- end,
- io:format(" Description:\t~s~n", [Description]),
+ Opt(" Version: \t~s~n", Version, undefined),
+ Opt(" Dependencies:\t~p~n", Deps, []),
+ Opt(" Description: \t~s~n", Description, undefined),
io:format("~n")
end.
@@ -230,6 +241,9 @@ fmt_list(Header, Plugins) ->
lists:flatten(
[Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]).
+fmt_missing(Desc, Missing) ->
+ fmt_list("The following " ++ Desc ++ " could not be found:", Missing).
+
usort_plugins(Plugins) ->
lists:usort(fun plugins_cmp/2, Plugins).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index f4c1f42b21..2717cc9217 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -22,11 +22,13 @@
-include("rabbit.hrl").
--import(rabbit_misc, [pget/2, pget/3]).
+-import(rabbit_misc, [pget/2]).
-export([register/0]).
-export([name/1, get/2, set/1]).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
+ list_formatted/1, info_keys/0]).
-rabbit_boot_step({?MODULE,
[{description, "policy parameters"},
@@ -41,7 +43,7 @@ name(#amqqueue{policy = Policy}) -> name0(Policy);
name(#exchange{policy = Policy}) -> name0(Policy).
name0(undefined) -> none;
-name0(Policy) -> pget(<<"name">>, Policy).
+name0(Policy) -> pget(name, Policy).
set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)};
set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}.
@@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) ->
get0(Name, match(EntityName, list(VHost))).
get0(_Name, undefined) -> {error, not_found};
-get0(Name, List) -> case pget(<<"policy">>, List) of
+get0(Name, List) -> case pget(definition, List) of
undefined -> {error, not_found};
Policy -> case pget(Name, Policy) of
undefined -> {error, not_found};
@@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of
%%----------------------------------------------------------------------------
+parse_set(VHost, Name, Pattern, Definition, undefined) ->
+ parse_set0(VHost, Name, Pattern, Definition, 0);
+parse_set(VHost, Name, Pattern, Definition, Priority) ->
+ try list_to_integer(Priority) of
+ Num -> parse_set0(VHost, Name, Pattern, Definition, Num)
+ catch
+ error:badarg -> {error, "~p priority must be a number", [Priority]}
+ end.
+
+parse_set0(VHost, Name, Pattern, Defn, Priority) ->
+ case rabbit_misc:json_decode(Defn) of
+ {ok, JSON} ->
+ set0(VHost, Name,
+ [{<<"pattern">>, list_to_binary(Pattern)},
+ {<<"definition">>, rabbit_misc:json_to_term(JSON)},
+ {<<"priority">>, Priority}]);
+ error ->
+ {error_string, "JSON decoding error"}
+ end.
+
+set(VHost, Name, Pattern, Definition, Priority) ->
+ PolicyProps = [{<<"pattern">>, Pattern},
+ {<<"definition">>, Definition},
+ {<<"priority">>, case Priority of
+ undefined -> 0;
+ _ -> Priority
+ end}],
+ set0(VHost, Name, PolicyProps).
+
+set0(VHost, Name, Term) ->
+ rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term).
+
+delete(VHost, Name) ->
+ rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name).
+
+lookup(VHost, Name) ->
+ case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of
+ not_found -> not_found;
+ P -> p(P, fun ident/1)
+ end.
+
+list() ->
+ list('_').
+
+list(VHost) ->
+ list0(VHost, fun ident/1).
+
+list_formatted(VHost) ->
+ order_policies(list0(VHost, fun format/1)).
+
+list0(VHost, DefnFun) ->
+ [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
+
+order_policies(PropList) ->
+ lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end,
+ PropList).
+
+p(Parameter, DefnFun) ->
+ Value = pget(value, Parameter),
+ [{vhost, pget(vhost, Parameter)},
+ {name, pget(name, Parameter)},
+ {pattern, pget(<<"pattern">>, Value)},
+ {definition, DefnFun(pget(<<"definition">>, Value))},
+ {priority, pget(<<"priority">>, Value)}].
+
+format(Term) ->
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
+
+ident(X) -> X.
+
+info_keys() -> [vhost, name, pattern, definition, priority].
+
+%%----------------------------------------------------------------------------
+
validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
@@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) ->
%%----------------------------------------------------------------------------
-list(VHost) ->
- [[{<<"name">>, pget(key, P)} | pget(value, P)]
- || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)].
-
update_policies(VHost) ->
Policies = list(VHost),
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
@@ -127,13 +200,52 @@ match(Name, Policies) ->
end.
matches(#resource{name = Name}, Policy) ->
- match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
+ match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]).
-sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
+sort_pred(A, B) -> pget(priority, A) >= pget(priority, B).
%%----------------------------------------------------------------------------
policy_validation() ->
- [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
- {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory},
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"definition">>, fun validation/2, mandatory}].
+
+validation(_Name, []) ->
+ {error, "no policy provided", []};
+validation(_Name, Terms) when is_list(Terms) ->
+ {Keys, Modules} = lists:unzip(
+ rabbit_registry:lookup_all(policy_validator)),
+ [] = dups(Keys), %% ASSERTION
+ Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys),
+ {TermKeys, _} = lists:unzip(Terms),
+ case dups(TermKeys) of
+ [] -> validation0(Validators, Terms);
+ Dup -> {error, "~p duplicate keys not allowed", [Dup]}
+ end;
+validation(_Name, Term) ->
+ {error, "parse error while reading policy: ~p", [Term]}.
+
+validation0(Validators, Terms) ->
+ case lists:foldl(
+ fun (Mod, {ok, TermsLeft}) ->
+ ModKeys = proplists:get_all_values(Mod, Validators),
+ case [T || {Key, _} = T <- TermsLeft,
+ lists:member(Key, ModKeys)] of
+ [] -> {ok, TermsLeft};
+ Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope}
+ end;
+ (_, Acc) ->
+ Acc
+ end, {ok, Terms}, proplists:get_keys(Validators)) of
+ {ok, []} ->
+ ok;
+ {ok, Unvalidated} ->
+ {error, "~p are not recognised policy settings", [Unvalidated]};
+ {Error, _} ->
+ Error
+ end.
+
+a2b(A) -> list_to_binary(atom_to_list(A)).
+
+dups(L) -> L -- lists:usort(L).
diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl
new file mode 100644
index 0000000000..b59dec2b47
--- /dev/null
+++ b/src/rabbit_policy_validator.erl
@@ -0,0 +1,37 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_policy_validator).
+
+-ifdef(use_specs).
+
+-type(validate_results() ::
+ 'ok' | {error, string(), [term()]} | [validate_results()]).
+
+-callback validate_policy([{binary(), term()}]) -> validate_results().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [
+ {validate_policy, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fab5af61e6..e2cc4aeba1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -492,6 +492,14 @@ handle_exception(State, Channel, Reason) ->
timer:sleep(?SILENT_CLOSE_DELAY * 1000),
throw({handshake_error, State#v1.connection_state, Channel, Reason}).
+%% we've "lost sync" with the client and hence must not accept any
+%% more input
+fatal_frame_error(Error, Type, Channel, Payload, State) ->
+ frame_error(Error, Type, Channel, Payload, State),
+ %% grace period to allow transmission of error
+ timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw(fatal_frame_error).
+
frame_error(Error, Type, Channel, Payload, State) ->
{Str, Bin} = payload_snippet(Payload),
handle_exception(State, Channel,
@@ -614,6 +622,17 @@ post_process_frame(_Frame, _ChPid, State) ->
%%--------------------------------------------------------------------------
+%% We allow clients to exceed the frame size a little bit since quite
+%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
+-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
+
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+ State = #v1{connection = #connection{frame_max = FrameMax}})
+ when FrameMax /= 0 andalso
+ PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
+ fatal_frame_error(
+ {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
+ Type, Channel, <<>>, State);
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -624,8 +643,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
switch_callback(State1, frame_header, 7);
- _ -> frame_error({invalid_frame_end_marker, EndMarker},
- Type, Channel, Payload, State)
+ _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
+ Type, Channel, Payload, State)
end;
%% The two rules pertaining to version negotiation:
@@ -921,26 +940,27 @@ i(last_blocked_age, #v1{last_blocked_at = T}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
i(auth_mechanism, #v1{auth_mechanism = none}) ->
none;
i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
- Username;
-i(user, #v1{connection = #connection{user = none}}) ->
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
+i(user, #v1{connection = #connection{user = none}}) ->
'';
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
+i(user, #v1{connection = #connection{user = #user{
+ username = Username}}}) ->
+ Username;
+i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
+i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
+i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
-i(client_properties, #v1{connection = #connection{
- client_properties = ClientProperties}}) ->
+i(client_properties, #v1{connection = #connection{client_properties =
+ ClientProperties}}) ->
ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index e14bbba018..32709d2484 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) ->
class_module(exchange) -> rabbit_exchange_type;
class_module(auth_mechanism) -> rabbit_auth_mechanism;
class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator.
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(policy_validator) -> rabbit_policy_validator.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index b58b459a7f..49060409e1 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
--export([parse_set/4, set/4, clear/3,
- list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1,
- lookup/3, value/3, value/4, info_keys/0]).
+-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1,
+ list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3,
+ value/3, value/4, info_keys/0]).
%%----------------------------------------------------------------------------
@@ -32,17 +32,23 @@
-> ok_or_error_string()).
-spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term())
-> ok_or_error_string()).
+-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term())
+ -> ok_or_error_string()).
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
-> ok_or_error_string()).
+-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
+ -> ok_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
--spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
--spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found').
--spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]).
--spec(list_strict/2 :: (rabbit_types:vhost(), binary())
+-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]).
+-spec(list_strict/1 :: (binary() | '_')
+ -> [rabbit_types:infos()] | 'not_found').
+-spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_')
+ -> [rabbit_types:infos()]).
+-spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_')
-> [rabbit_types:infos()] | 'not_found').
-spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary())
- -> rabbit_types:infos()).
+ -> rabbit_types:infos() | 'not_found').
-spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()).
-spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -57,29 +63,37 @@
%%---------------------------------------------------------------------------
-parse_set(VHost, Component, Key, String) ->
+parse_set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+parse_set(VHost, Component, Name, String) ->
case rabbit_misc:json_decode(String) of
- {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON));
+ {ok, JSON} -> set(VHost, Component, Name,
+ rabbit_misc:json_to_term(JSON));
error -> {error_string, "JSON decoding error"}
end.
-set(VHost, Component, Key, Term) ->
- case set0(VHost, Component, Key, Term) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
+set(_, <<"policy">>, _, _) ->
+ {error_string, "policies may not be set using this method"};
+set(VHost, Component, Name, Term) ->
+ set_any(VHost, Component, Name, Term).
format_error(L) ->
{error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}.
-set0(VHost, Component, Key, Term) ->
+set_any(VHost, Component, Name, Term) ->
+ case set_any0(VHost, Component, Name, Term) of
+ ok -> ok;
+ {errors, L} -> format_error(L)
+ end.
+
+set_any0(VHost, Component, Name, Term) ->
case lookup_component(Component) of
{ok, Mod} ->
- case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of
+ case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of
ok ->
- case mnesia_update(VHost, Component, Key, Term) of
+ case mnesia_update(VHost, Component, Name, Term) of
{old, Term} -> ok;
- _ -> Mod:notify(VHost, Component, Key, Term)
+ _ -> Mod:notify(VHost, Component, Name, Term)
end,
ok;
E ->
@@ -89,43 +103,49 @@ set0(VHost, Component, Key, Term) ->
E
end.
-mnesia_update(VHost, Component, Key, Term) ->
+mnesia_update(VHost, Component, Name, Term) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
+ Res = case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
[] -> new;
[Params] -> {old, Params#runtime_parameters.value}
end,
- ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write),
+ ok = mnesia:write(?TABLE, c(VHost, Component, Name, Term), write),
Res
end).
-clear(VHost, Component, Key) ->
- case clear0(VHost, Component, Key) of
+clear(_, <<"policy">> , _) ->
+ {error_string, "policies may not be cleared using this method"};
+clear(VHost, Component, Name) ->
+ clear_any(VHost, Component, Name).
+
+clear_any(VHost, Component, Name) ->
+ case clear_any0(VHost, Component, Name) of
ok -> ok;
{errors, L} -> format_error(L)
end.
-clear0(VHost, Component, Key) ->
+clear_any0(VHost, Component, Name) ->
case lookup_component(Component) of
{ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Key)) of
- ok -> mnesia_clear(VHost, Component, Key),
- Mod:notify_clear(VHost, Component, Key),
+ Mod:validate_clear(VHost, Component, Name)) of
+ ok -> mnesia_clear(VHost, Component, Name),
+ Mod:notify_clear(VHost, Component, Name),
ok;
E -> E
end;
E -> E
end.
-mnesia_clear(VHost, Component, Key) ->
+mnesia_clear(VHost, Component, Name) ->
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
- ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write)
+ ok = mnesia:delete(?TABLE, {VHost, Component, Name}, write)
end).
list() ->
- [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)].
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>].
list(VHost) -> list(VHost, '_', []).
list_strict(Component) -> list('_', Component, not_found).
@@ -136,60 +156,63 @@ list(VHost, Component, Default) ->
case component_good(Component) of
true -> Match = #runtime_parameters{key = {VHost, Component, '_'},
_ = '_'},
- [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)];
+ [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <-
+ mnesia:dirty_match_object(?TABLE, Match),
+ Comp =/= <<"policy">> orelse
+ Component =:= <<"policy">>];
_ -> Default
end.
list_formatted(VHost) ->
[pset(value, format(pget(value, P)), P) || P <- list(VHost)].
-lookup(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+lookup(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> p(Params)
end.
-value(VHost, Component, Key) ->
- case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of
+value(VHost, Component, Name) ->
+ case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of
not_found -> not_found;
Params -> Params#runtime_parameters.value
end.
-value(VHost, Component, Key, Default) ->
- Params = lookup0(VHost, Component, Key,
+value(VHost, Component, Name, Default) ->
+ Params = lookup0(VHost, Component, Name,
fun () ->
- lookup_missing(VHost, Component, Key, Default)
+ lookup_missing(VHost, Component, Name, Default)
end),
Params#runtime_parameters.value.
-lookup0(VHost, Component, Key, DefaultFun) ->
- case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of
+lookup0(VHost, Component, Name, DefaultFun) ->
+ case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of
[] -> DefaultFun();
[R] -> R
end.
-lookup_missing(VHost, Component, Key, Default) ->
+lookup_missing(VHost, Component, Name, Default) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read(?TABLE, {VHost, Component, Key}, read) of
- [] -> Record = c(VHost, Component, Key, Default),
+ case mnesia:read(?TABLE, {VHost, Component, Name}, read) of
+ [] -> Record = c(VHost, Component, Name, Default),
mnesia:write(?TABLE, Record, write),
Record;
[R] -> R
end
end).
-c(VHost, Component, Key, Default) ->
- #runtime_parameters{key = {VHost, Component, Key},
+c(VHost, Component, Name, Default) ->
+ #runtime_parameters{key = {VHost, Component, Name},
value = Default}.
-p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) ->
+p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) ->
[{vhost, VHost},
{component, Component},
- {key, Key},
+ {name, Name},
{value, Value}].
-info_keys() -> [component, key, value].
+info_keys() -> [component, name, value].
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index 5224ccaa36..d4d7271e90 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -16,9 +16,14 @@
-module(rabbit_runtime_parameters_test).
-behaviour(rabbit_runtime_parameter).
+-behaviour(rabbit_policy_validator).
-export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
+-export([validate_policy/1]).
+-export([register_policy_validator/0, unregister_policy_validator/0]).
+
+%----------------------------------------------------------------------------
register() ->
rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE).
@@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
+
+%----------------------------------------------------------------------------
+
+register_policy_validator() ->
+ rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE),
+ rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE).
+
+unregister_policy_validator() ->
+ rabbit_registry:unregister(policy_validator, <<"testeven">>),
+ rabbit_registry:unregister(policy_validator, <<"testpos">>).
+
+validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) ->
+ case length(Terms) rem 2 =:= 0 of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) ->
+ case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of
+ true -> ok;
+ false -> {error, "meh", []}
+ end;
+
+validate_policy(_) ->
+ {error, "meh", []}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e605485341..51ca6043b4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@ all_tests() ->
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
+ passed = test_rabbit_basic_header_handling(),
passed = test_priority_queue(),
passed = test_pg_local(),
passed = test_unfold(),
@@ -57,6 +58,7 @@ all_tests() ->
passed = test_dynamic_mirroring(),
passed = test_user_management(),
passed = test_runtime_parameters(),
+ passed = test_policy_validation(),
passed = test_server_status(),
passed = test_confirms(),
passed =
@@ -70,6 +72,7 @@ all_tests() ->
passed = test_configurable_server_properties(),
passed.
+
do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
@@ -158,6 +161,78 @@ test_multi_call() ->
exit(Pid3, bang),
passed.
+test_rabbit_basic_header_handling() ->
+ passed = write_table_with_invalid_existing_type_test(),
+ passed = invalid_existing_headers_test(),
+ passed = disparate_invalid_header_entries_accumulate_separately_test(),
+ passed = corrupt_or_invalid_headers_are_overwritten_test(),
+ passed = invalid_same_header_entry_accumulation_test(),
+ passed.
+
+-define(XDEATH_TABLE,
+ [{<<"reason">>, longstr, <<"blah">>},
+ {<<"queue">>, longstr, <<"foo.bar.baz">>},
+ {<<"exchange">>, longstr, <<"my-exchange">>},
+ {<<"routing-keys">>, array, []}]).
+
+-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]).
+
+-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}).
+-define(BAD_HEADER(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}).
+-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}).
+
+write_table_with_invalid_existing_type_test() ->
+ prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]),
+ passed.
+
+invalid_existing_headers_test() ->
+ Headers =
+ prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]),
+ {array, [{table, ?ROUTE_TABLE}]} =
+ rabbit_misc:table_lookup(Headers, <<"header2">>),
+ passed.
+
+disparate_invalid_header_entries_accumulate_separately_test() ->
+ BadHeaders = [?BAD_HEADER("header2")],
+ Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders),
+ Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE,
+ [?BAD_HEADER("header1") | Headers]),
+ {table, [?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("header2")]} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ passed.
+
+corrupt_or_invalid_headers_are_overwritten_test() ->
+ Headers0 = [?BAD_HEADER("header1"),
+ ?BAD_HEADER("x-invalid-headers")],
+ Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0),
+ {table,[?FOUND_BAD_HEADER("header1"),
+ ?FOUND_BAD_HEADER("x-invalid-headers")]} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ passed.
+
+invalid_same_header_entry_accumulation_test() ->
+ BadHeader1 = ?BAD_HEADER("header1", "a"),
+ Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]),
+ Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE,
+ [?BAD_HEADER("header1", "b") | Headers]),
+ {table, InvalidHeaders} =
+ rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY),
+ {array, [{longstr,<<"bad header1b">>},
+ {longstr,<<"bad header1a">>}]} =
+ rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>),
+ passed.
+
+prepend_check(HeaderKey, HeaderTable, Headers) ->
+ Headers1 = rabbit_basic:prepend_table_header(
+ HeaderKey, HeaderTable, Headers),
+ {table, Invalid} =
+ rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY),
+ {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey),
+ {array, [{Type, Value} | _]} =
+ rabbit_misc:table_lookup(Invalid, HeaderKey),
+ Headers1.
+
test_priority_queue() ->
false = priority_queue:is_queue(not_a_queue),
@@ -913,12 +988,12 @@ test_dynamic_mirroring() ->
Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
%% Add two nodes and drop one
Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
- %% Promote slave to master by policy
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
%% Don't try to include nodes that are not running
Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed
+ Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
@@ -1046,6 +1121,26 @@ test_runtime_parameters() ->
rabbit_runtime_parameters_test:unregister(),
passed.
+test_policy_validation() ->
+ rabbit_runtime_parameters_test:register_policy_validator(),
+ SetPol =
+ fun (Key, Val) ->
+ control_action(
+ set_policy,
+ ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])])
+ end,
+
+ ok = SetPol("testeven", []),
+ ok = SetPol("testeven", [1, 2]),
+ ok = SetPol("testeven", [1, 2, 3, 4]),
+ ok = SetPol("testpos", [2, 5, 5678]),
+
+ {error_string, _} = SetPol("testpos", [-1, 0, 1]),
+ {error_string, _} = SetPol("testeven", [ 1, 2, 3]),
+
+ rabbit_runtime_parameters_test:unregister_policy_validator(),
+ passed.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddb136a73d..8a3fd9d917 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,11 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/5, drain_confirmed/1,
+ publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
ram_msg_count = RamMsgCount + 1,
unconfirmed = UC1 })).
-publish_delivered(false, #basic_message { id = MsgId },
- #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { async_callback = Callback,
- len = 0 }) ->
- case NeedsConfirming of
- true -> blind_confirm(Callback, gb_sets:singleton(MsgId));
- false -> ok
- end,
- {undefined, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
+publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, State = #vqstate { len = 0,
@@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+discard(_MsgId, _ChPid, State) -> State.
+
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
true -> {[], State}; %% common case
@@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State).
is_duplicate(_Msg, State) -> {false, State}.
-discard(_Msg, _ChPid, State) -> State.
-
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -1325,12 +1316,9 @@ must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-blind_confirm(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
- blind_confirm(Callback, MsgIdSet);
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
Callback(?MODULE,
fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 03dfbe245d..297fa56fe3 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -97,6 +97,8 @@ internal_delete(VHostPath) ->
proplists:get_value(component, Info),
proplists:get_value(key, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.