diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-29 17:36:07 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-29 17:36:07 +0000 |
| commit | f389dbe03d78bde9dbb5cfe2b9450d907edd7cf6 (patch) | |
| tree | 5999cf8316692f243f6292cf87a3114a2094fa06 /src | |
| parent | f774e7ba2fd8e478ddacc11729175f64344a71cf (diff) | |
| parent | 0609c5bfa368cae3724feac2da26b9436e6e5d36 (diff) | |
| download | rabbitmq-server-git-f389dbe03d78bde9dbb5cfe2b9450d907edd7cf6.tar.gz | |
Merge default
Diffstat (limited to 'src')
27 files changed, 797 insertions, 352 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index ae73ebc805..c52c296ad3 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -36,7 +36,7 @@ -rabbit_boot_step({codec_correctness_check, [{description, "codec correctness check"}, {mfa, {rabbit_binary_generator, - check_empty_content_body_frame_size, + check_empty_frame_size, []}}, {requires, pre_boot}, {enables, external_infrastructure}]}). @@ -303,7 +303,6 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - exit(bang), ok = app_utils:start_applications( app_startup_order(), fun handle_app_error/2), ok = print_plugin_info(rabbit_plugins:active()) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6ad85b24f5..87e44aa30c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,7 +18,7 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, +-export([lookup/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -81,6 +81,7 @@ rabbit_types:error('not_found'); ([name()]) -> [rabbit_types:amqqueue()]). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')). +-spec(with/3 :: (name(), qfun(A), fun(() -> B)) -> A | B). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). -spec(assert_equivalence/5 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7ce6958223..ec5ef3f80b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -513,6 +513,14 @@ send_or_record_confirm(#delivery{sender = SenderPid, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. +discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}}, + State) -> + %% fake an 'eventual' confirm from BQ; noop if not needed + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + confirm_messages([MsgId], State), + BQS1 = BQ:discard(MsgId, SenderPid, BQS), + State1#q{backing_queue_state = BQS1}. + run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), @@ -521,17 +529,20 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, + Props = #message_properties{delivered = Delivered}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( - fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> + fun (true, State1 = #q{backing_queue_state = BQS2}) -> {AckTag, BQS3} = BQ:publish_delivered( - AckRequired, Message, Props, - SenderPid, BQS2), - {{Message, Props#message_properties.delivered, AckTag}, - true, State1#q{backing_queue_state = BQS3}} + Message, Props, SenderPid, BQS2), + {{Message, Delivered, AckTag}, + true, State1#q{backing_queue_state = BQS3}}; + (false, State1) -> + {{Message, Delivered, undefined}, + true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); {published, BQS1} -> {true, State#q{backing_queue_state = BQS1}}; @@ -548,11 +559,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, State2; %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> - %% fake an 'eventual' confirm from BQ; noop if not needed - State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = - confirm_messages([Message#basic_message.id], State2), - BQS1 = BQ:discard(Message, SenderPid, BQS), - State3#q{backing_queue_state = BQS1}; + discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, @@ -688,16 +695,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = - case DLXFun of - undefined -> - {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> - {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), - DLXFun(Msgs), - {Next, BQS2} - end, + {Props, BQS1} = case DLXFun of + undefined -> {Next, undefined, BQS2} = + BQ:dropwhile(ExpirePred, false, BQS), + {Next, BQS2}; + _ -> {Next, Msgs, BQS2} = + BQ:dropwhile(ExpirePred, true, BQS), + DLXFun(Msgs), + {Next, BQS2} + end, ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp @@ -847,8 +853,8 @@ make_dead_letter_msg(Reason, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}], - HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) end, Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), @@ -1303,11 +1309,11 @@ handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); handle_info(emit_stats, State) -> - %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer), - assert_invariant(State1), - {noreply, State1, hibernate}; + {noreply, State1, Timeout} = noreply(State), + %% Need to reset *after* we've been through noreply/1 so we do not + %% just create another timer always and therefore never hibernate + {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index c6d1778532..af660c60a0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -84,12 +84,16 @@ %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). --callback publish_delivered(true, rabbit_types:basic_message(), +-callback publish_delivered(rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}. + -> {ack(), state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ for some +%% reason. Note that this may be invoked for messages for which +%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', +%% BQS}. +-callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -200,13 +204,6 @@ -callback is_duplicate(rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}. -%% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this is may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. --callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). - -else. -export([behaviour_info/1]). @@ -214,12 +211,11 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, - {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, - {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, - {discard, 3}]; + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index d658f0f0f4..b37fbb29e2 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -119,7 +119,7 @@ qc_publish_multiple(#state{}) -> qc_publish_delivered(#state{bqstate = BQ}) -> {call, ?BQMOD, publish_delivered, - [boolean(), qc_message(), #message_properties{}, self(), BQ]}. + [qc_message(), #message_properties{}, self(), BQ]}. qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. @@ -199,7 +199,7 @@ next_state(S, _BQ, {call, ?MODULE, publish_multiple, [PublishCount]}) -> next_state(S, Res, {call, ?BQMOD, publish_delivered, - [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> + [Msg, MsgProps, _Pid, _BQ]}) -> #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S, AckTag = {call, erlang, element, [1, Res]}, BQ1 = {call, erlang, element, [2, Res]}, @@ -213,10 +213,7 @@ next_state(S, Res, true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end, - acks = case AckReq of - true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]; - false -> Acks - end + acks = [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks] }; next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index db2b7e9570..9966c0df31 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/5, publish/1, - message/3, message/4, properties/1, append_table_header/3, + message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1]). -export([build_content/2, from_content/1]). @@ -58,7 +58,7 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(append_table_header/3 :: +-spec(prepend_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). @@ -177,15 +177,45 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). -append_table_header(Name, Info, undefined) -> - append_table_header(Name, Info, []); -append_table_header(Name, Info, Headers) -> - Prior = case rabbit_misc:table_lookup(Headers, Name) of - undefined -> []; - {array, Existing} -> Existing - end, +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index d69376fb75..4700fa3170 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -18,20 +18,11 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). -%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 -%% - 1 byte of frame type -%% - 2 bytes of channel number -%% - 4 bytes of frame payload length -%% - 1 byte of payload trailer FRAME_END byte -%% See definition of check_empty_content_body_frame_size/0, -%% an assertion called at startup. --define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). - -export([build_simple_method_frame/3, build_simple_content_frames/4, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). --export([check_empty_content_body_frame_size/0]). +-export([check_empty_frame_size/0]). -export([ensure_content_encoded/2, clear_encoded_content/1]). -export([map_exception/3]). @@ -53,7 +44,7 @@ -spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). -spec(encode_properties/2 :: ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). --spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). +-spec(check_empty_frame_size/0 :: () -> 'ok'). -spec(ensure_content_encoded/2 :: (rabbit_types:content(), rabbit_types:protocol()) -> rabbit_types:encoded_content()). @@ -88,10 +79,8 @@ build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) -> [HeaderFrame | ContentFrames]. build_content_frames(FragsRev, FrameMax, ChannelInt) -> - BodyPayloadMax = if FrameMax == 0 -> - iolist_size(FragsRev); - true -> - FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE + BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev); + true -> FrameMax - ?EMPTY_FRAME_SIZE end, build_content_frames(0, [], BodyPayloadMax, [], lists:reverse(FragsRev), BodyPayloadMax, ChannelInt). @@ -257,15 +246,13 @@ encode_property(timestamp, Int) -> encode_property(table, Table) -> table_to_binary(Table). -check_empty_content_body_frame_size() -> - %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is - %% defined correctly. +check_empty_frame_size() -> + %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly. ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)), - if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE -> + if ComputedSize == ?EMPTY_FRAME_SIZE -> ok; true -> - exit({incorrect_empty_content_body_frame_size, - ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) + exit({incorrect_empty_frame_size, ComputedSize, ?EMPTY_FRAME_SIZE}) end. ensure_content_encoded(Content = #content{properties_bin = PropBin, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0d13312b0b..54427206c5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1153,10 +1153,6 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> - %% FIXME: connection exception (!) on failure?? - %% (see rule named "failure" in spec-XML) - %% FIXME: don't allow binding to internal exchanges - - %% including the one named "" ! {DestinationName, ActualRoutingKey} = expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), check_write_permitted(DestinationName, State), diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index a3cbf6e53c..25f7d758c6 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -70,6 +70,10 @@ {clear_parameter, [?VHOST_DEF]}, {list_parameters, [?VHOST_DEF]}, + {set_policy, [?VHOST_DEF]}, + {clear_policy, [?VHOST_DEF]}, + {list_policies, [?VHOST_DEF]}, + {list_queues, [?VHOST_DEF]}, {list_exchanges, [?VHOST_DEF]}, {list_bindings, [?VHOST_DEF]}, @@ -98,7 +102,9 @@ {"Bindings", rabbit_binding, info_all, info_keys}, {"Consumers", rabbit_amqqueue, consumers_all, consumer_info_keys}, {"Permissions", rabbit_auth_backend_internal, list_vhost_permissions, - vhost_perms_info_keys}]). + vhost_perms_info_keys}, + {"Policies", rabbit_policy, list_formatted, info_keys}, + {"Parameters", rabbit_runtime_parameters, list_formatted, info_keys}]). %%---------------------------------------------------------------------------- @@ -458,6 +464,28 @@ action(list_parameters, Node, [], Opts, Inform) -> rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), rabbit_runtime_parameters:info_keys()); +action(set_policy, Node, [Key, Pattern, Defn | Prio], Opts, Inform) + when Prio == [] orelse length(Prio) == 1 -> + Msg = "Setting policy ~p for pattern ~p to ~p", + {InformMsg, Prio1} = case Prio of [] -> {Msg, undefined}; + [P] -> {Msg ++ " with priority ~s", P} + end, + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform(InformMsg, [Key, Pattern, Defn] ++ Prio), + rpc_call(Node, rabbit_policy, parse_set, + [VHostArg, list_to_binary(Key), Pattern, Defn, Prio1]); + +action(clear_policy, Node, [Key], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Clearing policy ~p", [Key]), + rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); + +action(list_policies, Node, [], Opts, Inform) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), + rabbit_policy:info_keys()); + action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index ba0cb04f71..cedbbdb380 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -144,11 +144,7 @@ gen_secure() -> %% employs base64url encoding, which is safer in more contexts than %% plain base64. string(G, Prefix) -> - Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; - ($\/, Acc) -> [$\_ | Acc]; - ($\=, Acc) -> Acc; - (Chr, Acc) -> [Chr | Acc] - end, [], base64:encode_to_string(G)). + Prefix ++ "-" ++ rabbit_misc:base64url(G). binary(G, Prefix) -> list_to_binary(string(G, Prefix)). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 377d51868d..6a7a28f291 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,11 +17,11 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). + status/1, invoke/3, is_duplicate/2, fold/3]). -export([start/1, stop/0]). @@ -88,12 +88,10 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) -> +init(Q, Recover, AsyncCallback) -> {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS), - {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State. @@ -109,6 +107,8 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> ok = rabbit_amqqueue:store_queue( Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) end), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -183,25 +183,42 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). -publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, +publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS, ack_msg_id = AM }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast( - GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), - {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), AM1 = maybe_store_acktag(AckTag, MsgId, AM), - {AckTag, - ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 })}. + State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }, + {AckTag, ensure_monitoring(ChPid, State1)}. + +discard(MsgId, ChPid, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, MsgId}), + BQS1 = BQ:discard(MsgId, ChPid, BQS), + ensure_monitoring( + ChPid, State #state { + backing_queue_state = BQS1, + seen_status = dict:erase(MsgId, SS) }); + {ok, discarded} -> + State + end. dropwhile(Pred, AckRequired, State = #state{gm = GM, @@ -375,26 +392,6 @@ is_duplicate(Message = #basic_message { id = MsgId }, {discarded, State} end. -discard(Msg = #basic_message { id = MsgId }, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> - %% It's a massive error if we get told to discard something that's - %% already been published or published-and-confirmed. To do that - %% would require non FIFO access. Hence we should not find - %% 'published' or 'confirmed' in this dict:find. - case dict:find(MsgId, SS) of - error -> - ok = gm:broadcast(GM, {discard, ChPid, Msg}), - ensure_monitoring( - ChPid, State #state { - backing_queue_state = BQ:discard(Msg, ChPid, BQS), - seen_status = dict:erase(MsgId, SS) }); - {ok, discarded} -> - State - end. - %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4c8406d9cf..ef0a374d2e 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -15,16 +15,26 @@ %% -module(rabbit_mirror_queue_misc). +-behaviour(rabbit_policy_validator). -export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2]). + is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only -export([suggested_queue_nodes/4]). -include("rabbit.hrl"). +-rabbit_boot_step({?MODULE, + [{description, "HA policy validation"}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-mode">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-params">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -127,7 +137,7 @@ on_node_up() -> ok. drop_mirrors(QName, Nodes) -> - [ok = drop_mirror(QName, Node) || Node <- Nodes], + [{ok, _} = drop_mirror(QName, Node) || Node <- Nodes], ok. drop_mirror(QName, MirrorNode) -> @@ -144,7 +154,7 @@ drop_mirror(QName, MirrorNode) -> "Dropping queue mirror on node ~p for ~s~n", [MirrorNode, rabbit_misc:rs(Name)]), exit(Pid, {shutdown, dropped}), - ok + {ok, dropped} end end). @@ -202,7 +212,8 @@ if_mirrored_queue(QName, Fun) -> false -> ok; true -> Fun(Q) end - end). + end, + rabbit_misc:const({ok, not_found})). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -258,7 +269,11 @@ policy(Policy, Q) -> suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> {MNode, Possible -- [MNode]}; suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> - Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + %% If the current master is currently not in the nodes specified, + %% act like it is for the purposes below - otherwise we will not + %% return it in the results... + Nodes = lists:usort([MNode | Nodes1]), Unavailable = Nodes -- Possible, Available = Nodes -- Unavailable, case Available of @@ -304,19 +319,12 @@ is_mirrored(Q) -> _ -> false end. - -%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to -%% master and start any needed slaves. However, if node(QPid) is not -%% in the nodes for the policy, it won't switch it. So this is for the -%% case where we kill the existing queue and restart elsewhere. TODO: -%% is this TRTTD? All alternatives seem ugly. update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid), - update_mirrors0(OldQ, NewQ); %% [1] + {false, true} -> rabbit_amqqueue:start_mirroring(QPid); {true, true} -> update_mirrors0(OldQ, NewQ) end. @@ -328,3 +336,35 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, add_mirrors(QName, NewNodes -- OldNodes), drop_mirrors(QName, OldNodes -- NewNodes), ok. + +%%---------------------------------------------------------------------------- + +validate_policy(KeyList) -> + validate_policy( + proplists:get_value(<<"ha-mode">>, KeyList), + proplists:get_value(<<"ha-params">>, KeyList, none)). + +validate_policy(<<"all">>, none) -> + ok; +validate_policy(<<"all">>, _Params) -> + {error, "ha-mode=\"all\" does not take parameters", []}; + +validate_policy(<<"nodes">>, []) -> + {error, "ha-mode=\"nodes\" list must be non-empty", []}; +validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) -> + case [I || I <- Nodes, not is_binary(I)] of + [] -> ok; + Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, " + "~p was not a string", [Invalid]} + end; +validate_policy(<<"nodes">>, Params) -> + {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}; + +validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 -> + ok; +validate_policy(<<"exactly">>, Params) -> + {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}; + +validate_policy(Mode, _Params) -> + {error, "~p is not a valid ha-mode value", [Mode]}. + diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d0efe37adc..1ba1420f42 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -697,25 +697,23 @@ publish_or_discard(Status, ChPid, MsgId, State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. -process_instruction({publish, false, ChPid, MsgProps, +process_instruction({publish, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; -process_instruction({publish, {true, AckRequired}, ChPid, MsgProps, +process_instruction({publish_delivered, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - {ok, maybe_store_ack(AckRequired, MsgId, AckTag, + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; -process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State) -> +process_instruction({discard, ChPid, MsgId}, State) -> State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(Msg, ChPid, BQS), + BQS1 = BQ:discard(MsgId, ChPid, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a0536a50a9..ab9a9cebd4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -63,6 +63,7 @@ -export([version/0]). -export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). +-export([base64url/1]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -227,6 +228,7 @@ -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). -spec(term_to_json/1 :: (any()) -> any()). +-spec(base64url/1 :: (binary()) -> string()). -endif. @@ -987,3 +989,10 @@ term_to_json(L) when is_list(L) -> term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. + +base64url(In) -> + lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; + ($\/, Acc) -> [$\_ | Acc]; + ($\=, Acc) -> Acc; + (Chr, Acc) -> [Chr | Acc] + end, [], base64:encode_to_string(In))). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 870692282f..942048f9b0 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -108,8 +108,24 @@ init() -> ok. init_from_config() -> - {ok, {TryNodes, NodeType}} = - application:get_env(rabbit, cluster_nodes), + {TryNodes, NodeType} = + case application:get_env(rabbit, cluster_nodes) of + {ok, Nodes} when is_list(Nodes) -> + Config = {Nodes -- [node()], case lists:member(node(), Nodes) of + true -> disc; + false -> ram + end}, + error_logger:warning_msg( + "Converting legacy 'cluster_nodes' configuration~n ~w~n" + "to~n ~w.~n~n" + "Please update the configuration to the new format " + "{Nodes, NodeType}, where Nodes contains the nodes that the " + "node will try to cluster with, and NodeType is either " + "'disc' or 'ram'~n", [Nodes, Config]), + Config; + {ok, Config} -> + Config + end, case find_good_node(nodes_excl_me(TryNodes)) of {ok, Node} -> rabbit_log:info("Node '~p' selected for clustering from " @@ -158,7 +174,7 @@ join_cluster(DiscoveryNode, NodeType) -> %% this case - we're joining a new cluster with new nodes which %% are not in synch with the current node. I also lifts the burden %% of reseting the node from the user. - reset(false), + reset_gracefully(), %% Join the cluster rabbit_misc:local_info_msg("Clustering with ~p as ~p node~n", @@ -172,39 +188,35 @@ join_cluster(DiscoveryNode, NodeType) -> %% cluster, has no cluster configuration, no local database, and no %% persisted messages reset() -> + ensure_mnesia_not_running(), rabbit_misc:local_info_msg("Resetting Rabbit~n", []), - reset(false). + reset_gracefully(). force_reset() -> + ensure_mnesia_not_running(), rabbit_misc:local_info_msg("Resetting Rabbit forcefully~n", []), - reset(true). + wipe(). + +reset_gracefully() -> + AllNodes = cluster_nodes(all), + %% Reconnecting so that we will get an up to date nodes. We don't + %% need to check for consistency because we are resetting. + %% Force=true here so that reset still works when clustered with a + %% node which is down. + init_db_with_mnesia(AllNodes, node_type(), false, false), + case is_only_clustered_disc_node() of + true -> e(resetting_only_disc_node); + false -> ok + end, + leave_cluster(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), + wipe(). -reset(Force) -> - ensure_mnesia_not_running(), - Nodes = case Force of - true -> - nodes(); - false -> - AllNodes = cluster_nodes(all), - %% Reconnecting so that we will get an up to date - %% nodes. We don't need to check for consistency - %% because we are resetting. Force=true here so - %% that reset still works when clustered with a - %% node which is down. - init_db_with_mnesia(AllNodes, node_type(), false, false), - case is_only_clustered_disc_node() of - true -> e(resetting_only_disc_node); - false -> ok - end, - leave_cluster(), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema), - cluster_nodes(all) - end, +wipe() -> %% We need to make sure that we don't end up in a distributed %% Erlang system with nodes while not being in an Mnesia cluster %% with them. We don't handle that well. - [erlang:disconnect_node(N) || N <- Nodes], + [erlang:disconnect_node(N) || N <- cluster_nodes(all)], %% remove persisted messages and any other garbage we find ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok = rabbit_node_monitor:reset_cluster_status(), @@ -221,7 +233,9 @@ change_cluster_node_type(Type) -> {ok, Status} -> Status; {error, _Reason} -> e(cannot_connect_to_cluster) end, - Node = case RunningNodes of + %% We might still be marked as running by a remote node since the + %% information of us going down might not have propagated yet. + Node = case RunningNodes -- [node()] of [] -> e(no_online_cluster_nodes); [Node0|_] -> Node0 end, @@ -276,18 +290,18 @@ forget_cluster_node(Node, RemoveWhenOffline) -> end. remove_node_offline_node(Node) -> - %% We want the running nodes *now*, so we don't call - %% `cluster_nodes(running)' which will just get what's in the cluster status - %% file. - case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of + %% Here `mnesia:system_info(running_db_nodes)' will RPC, but that's what we + %% want - we need to know the running nodes *now*. If the current node is a + %% RAM node it will return bogus results, but we don't care since we only do + %% this operation from disc nodes. + case {mnesia:system_info(running_db_nodes) -- [Node], node_type()} of {[], disc} -> - %% Note that while we check if the nodes was the last to - %% go down, apart from the node we're removing from, this - %% is still unsafe. Consider the situation in which A and - %% B are clustered. A goes down, and records B as the - %% running node. Then B gets clustered with C, C goes down - %% and B goes down. In this case, C is the second-to-last, - %% but we don't know that and we'll remove B from A + %% Note that while we check if the nodes was the last to go down, + %% apart from the node we're removing from, this is still unsafe. + %% Consider the situation in which A and B are clustered. A goes + %% down, and records B as the running node. Then B gets clustered + %% with C, C goes down and B goes down. In this case, C is the + %% second-to-last, but we don't know that and we'll remove B from A %% anyway, even if that will lead to bad things. case cluster_nodes(running) -- [node(), Node] of [] -> start_mnesia(), @@ -320,10 +334,17 @@ status() -> [{nodes, (IfNonEmpty(disc, cluster_nodes(disc)) ++ IfNonEmpty(ram, cluster_nodes(ram)))}] ++ case mnesia:system_info(is_running) of - yes -> [{running_nodes, cluster_nodes(running)}]; + yes -> RunningNodes = cluster_nodes(running), + [{running_nodes, cluster_nodes(running)}, + {partitions, mnesia_partitions(RunningNodes)}]; no -> [] end. +mnesia_partitions(Nodes) -> + {Replies, _BadNodes} = rpc:multicall( + Nodes, rabbit_node_monitor, partitions, []), + [Reply || Reply = {_, R} <- Replies, R =/= []]. + is_clustered() -> AllNodes = cluster_nodes(all), AllNodes =/= [] andalso AllNodes =/= [node()]. @@ -332,7 +353,7 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% This function is the actual source of information, since it gets %% the data from mnesia. Obviously it'll work only when mnesia is %% running. -mnesia_nodes() -> +cluster_status_from_mnesia() -> case mnesia:system_info(is_running) of no -> {error, mnesia_not_running}; @@ -352,39 +373,33 @@ mnesia_nodes() -> disc -> nodes_incl_me(DiscCopies); ram -> DiscCopies end, - {ok, {AllNodes, DiscNodes}}; + %% `mnesia:system_info(running_db_nodes)' is safe since + %% we know that mnesia is running + RunningNodes = mnesia:system_info(running_db_nodes), + {ok, {AllNodes, DiscNodes, RunningNodes}}; false -> {error, tables_not_present} end end. cluster_status(WhichNodes) -> - %% I don't want to call `running_nodes/1' unless if necessary, since it's - %% pretty expensive. - {AllNodes1, DiscNodes1, RunningNodesThunk} = - case mnesia_nodes() of - {ok, {AllNodes, DiscNodes}} -> - {AllNodes, DiscNodes, fun() -> running_nodes(AllNodes) end}; + {AllNodes, DiscNodes, RunningNodes} = Nodes = + case cluster_status_from_mnesia() of + {ok, Nodes0} -> + Nodes0; {error, _Reason} -> - {AllNodes, DiscNodes, RunningNodes} = + {AllNodes0, DiscNodes0, RunningNodes0} = rabbit_node_monitor:read_cluster_status(), %% The cluster status file records the status when the node is %% online, but we know for sure that the node is offline now, so %% we can remove it from the list of running nodes. - {AllNodes, DiscNodes, fun() -> nodes_excl_me(RunningNodes) end} + {AllNodes0, DiscNodes0, nodes_excl_me(RunningNodes0)} end, case WhichNodes of - status -> {AllNodes1, DiscNodes1, RunningNodesThunk()}; - all -> AllNodes1; - disc -> DiscNodes1; - ram -> AllNodes1 -- DiscNodes1; - running -> RunningNodesThunk() - end. - -cluster_status_from_mnesia() -> - case mnesia_nodes() of - {ok, {AllNodes, DiscNodes}} -> {ok, {AllNodes, DiscNodes, - running_nodes(AllNodes)}}; - {error, _} = Err -> Err + status -> Nodes; + all -> AllNodes; + disc -> DiscNodes; + ram -> AllNodes -- DiscNodes; + running -> RunningNodes end. node_info() -> @@ -719,14 +734,6 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -%% We're not using `mnesia:system_info(running_db_nodes)' directly -%% because if the node is a RAM node it won't know about other nodes -%% when mnesia is stopped -running_nodes(Nodes) -> - {Replies, _BadNodes} = rpc:multicall(Nodes, - rabbit_mnesia, is_running_remote, []), - [Node || {Running, Node} <- Replies, Running]. - is_running_remote() -> {mnesia:system_info(is_running) =:= yes, node()}. check_consistency(OTP, Rabbit) -> @@ -810,21 +817,23 @@ e(Tag) -> throw({error, {Tag, error_description(Tag)}}). error_description(clustering_only_disc_node) -> "You cannot cluster a node if it is the only disc node in its existing " " cluster. If new nodes joined while this node was offline, use " - "\"update_cluster_nodes\" to add them manually."; + "'update_cluster_nodes' to add them manually."; error_description(resetting_only_disc_node) -> "You cannot reset a node when it is the only disc node in a cluster. " "Please convert another node of the cluster to a disc node first."; error_description(already_clustered) -> - "You are already clustered with the nodes you have selected."; + "You are already clustered with the nodes you have selected. If the " + "node you are trying to cluster with is not present in the current " + "node status, use 'update_cluster_nodes'."; error_description(not_clustered) -> "Non-clustered nodes can only be disc nodes."; error_description(cannot_connect_to_cluster) -> "Could not connect to the cluster nodes present in this node's " "status file. If the cluster has changed, you can use the " - "\"update_cluster_nodes\" command to point to the new cluster nodes."; + "'update_cluster_nodes' command to point to the new cluster nodes."; error_description(no_online_cluster_nodes) -> "Could not find any online cluster nodes. If the cluster has changed, " - "you can use the 'recluster' command."; + "you can use the 'update_cluster_nodes' command."; error_description(cannot_connect_to_node) -> "Could not connect to the cluster node provided."; error_description(inconsistent_cluster) -> @@ -839,11 +848,11 @@ error_description(offline_node_no_offline_flag) -> "but can be done with the --offline flag. Please consult the manual " "for rabbitmqctl for more information."; error_description(not_last_node_to_go_down) -> - "The node you're trying to remove from was not the last to go down " + "The node you are trying to remove from was not the last to go down " "(excluding the node you are removing). Please use the the last node " "to go down to remove nodes when the cluster is offline."; error_description(removing_node_from_offline_node) -> - "To remove a node remotely from an offline node, the node you're removing " + "To remove a node remotely from an offline node, the node you are removing " "from must be a disc node and all the other nodes must be offline."; error_description(no_running_cluster_nodes) -> "You cannot leave a cluster if no online nodes are present.". diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 026aa3624e..b11c9d049a 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -24,6 +24,7 @@ write_cluster_status/1, read_cluster_status/0, update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). +-export([partitions/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -32,6 +33,8 @@ -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). +-record(state, {monitors, partitions}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -50,6 +53,8 @@ -spec(notify_joined_cluster/0 :: () -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). +-spec(partitions/0 :: () -> {node(), [{atom(), node()}]}). + -endif. %%---------------------------------------------------------------------------- @@ -168,10 +173,23 @@ notify_left_cluster(Node) -> ok. %%---------------------------------------------------------------------------- +%% Server calls +%%---------------------------------------------------------------------------- + +partitions() -> + gen_server:call(?SERVER, partitions, infinity). + +%%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> {ok, pmon:new()}. +init([]) -> + {ok, _} = mnesia:subscribe(system), + {ok, #state{monitors = pmon:new(), + partitions = []}}. + +handle_call(partitions, _From, State = #state{partitions = Partitions}) -> + {reply, {node(), Partitions}, State}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -179,9 +197,10 @@ handle_call(_Request, _From, State) -> %% Note: when updating the status file, we can't simply write the %% mnesia information since the message can (and will) overtake the %% mnesia propagation. -handle_cast({node_up, Node, NodeType}, Monitors) -> +handle_cast({node_up, Node, NodeType}, + State = #state{monitors = Monitors}) -> case pmon:is_monitored({rabbit, Node}, Monitors) of - true -> {noreply, Monitors}; + true -> {noreply, State}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({add_node(Node, AllNodes), @@ -191,7 +210,8 @@ handle_cast({node_up, Node, NodeType}, Monitors) -> end, add_node(Node, RunningNodes)}), ok = handle_live_rabbit(Node), - {noreply, pmon:monitor({rabbit, Node}, Monitors)} + {noreply, State#state{ + monitors = pmon:monitor({rabbit, Node}, Monitors)}} end; handle_cast({joined_cluster, Node, NodeType}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), @@ -210,12 +230,21 @@ handle_cast({left_cluster, Node}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Monitors) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, + State = #state{monitors = Monitors}) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, pmon:erase({rabbit, Node}, Monitors)}; + {noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}}; + +handle_info({mnesia_system_event, + {inconsistent_database, running_partitioned_network, Node}}, + State = #state{partitions = Partitions}) -> + Partitions1 = ordsets:to_list( + ordsets:add_element(Node, ordsets:from_list(Partitions))), + {noreply, State#state{partitions = Partitions1}}; + handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index ecb1961126..abe9b08968 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -100,8 +100,13 @@ dependencies(Reverse, Sources, AllPlugins) -> {ok, G} = rabbit_misc:build_acyclic_graph( fun (App, _Deps) -> [{App, App}] end, fun (App, Deps) -> [{App, Dep} || Dep <- Deps] end, - [{Name, Deps} - || #plugin{name = Name, dependencies = Deps} <- AllPlugins]), + lists:ukeysort( + 1, [{Name, Deps} || + #plugin{name = Name, + dependencies = Deps} <- AllPlugins] ++ + [{Dep, []} || + #plugin{dependencies = Deps} <- AllPlugins, + Dep <- Deps])), Dests = case Reverse of false -> digraph_utils:reachable(Sources, G); true -> digraph_utils:reaching(Sources, G) diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 572cf15019..2158d1da15 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -108,16 +108,19 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> Enabled, AllPlugins), ToEnable = [list_to_atom(Name) || Name <- ToEnable0], Missing = ToEnable -- plugin_names(AllPlugins), - case Missing of - [] -> ok; - _ -> throw({error_string, - fmt_list("The following plugins could not be found:", - Missing)}) - end, NewEnabled = lists:usort(Enabled ++ ToEnable), - write_enabled_plugins(PluginsFile, NewEnabled), NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), + MissingDeps = (NewImplicitlyEnabled -- plugin_names(AllPlugins)) -- Missing, + case {Missing, MissingDeps} of + {[], []} -> ok; + {Miss, []} -> throw({error_string, fmt_missing("plugins", Miss)}); + {[], Miss} -> throw({error_string, fmt_missing("dependencies", Miss)}); + {_, _} -> throw({error_string, + fmt_missing("plugins", Missing) ++ + fmt_missing("dependencies", MissingDeps)}) + end, + write_enabled_plugins(PluginsFile, NewEnabled), maybe_warn_mochiweb(NewImplicitlyEnabled), case NewEnabled -- ImplicitlyEnabled of [] -> io:format("Plugin configuration unchanged.~n"); @@ -183,9 +186,12 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> EnabledImplicitly = rabbit_plugins:dependencies(false, EnabledExplicitly, AvailablePlugins) -- EnabledExplicitly, + Missing = [#plugin{name = Name, dependencies = []} || + Name <- ((EnabledExplicitly ++ EnabledImplicitly) -- + plugin_names(AvailablePlugins))], {ok, RE} = re:compile(Pattern), Plugins = [ Plugin || - Plugin = #plugin{name = Name} <- AvailablePlugins, + Plugin = #plugin{name = Name} <- AvailablePlugins ++ Missing, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, if OnlyEnabled -> lists:member(Name, EnabledExplicitly); OnlyEnabledAll -> (lists:member(Name, @@ -196,30 +202,35 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || #plugin{name = Name} <- Plugins1] ++ [0]), - [format_plugin(P, EnabledExplicitly, EnabledImplicitly, Format, - MaxWidth) || P <- Plugins1], + [format_plugin(P, EnabledExplicitly, EnabledImplicitly, + plugin_names(Missing), Format, MaxWidth) || P <- Plugins1], ok. format_plugin(#plugin{name = Name, version = Version, description = Description, dependencies = Deps}, - EnabledExplicitly, EnabledImplicitly, Format, MaxWidth) -> + EnabledExplicitly, EnabledImplicitly, Missing, + Format, MaxWidth) -> Glyph = case {lists:member(Name, EnabledExplicitly), - lists:member(Name, EnabledImplicitly)} of - {true, false} -> "[E]"; - {false, true} -> "[e]"; - _ -> "[ ]" + lists:member(Name, EnabledImplicitly), + lists:member(Name, Missing)} of + {true, false, false} -> "[E]"; + {false, true, false} -> "[e]"; + {_, _, true} -> "[!]"; + _ -> "[ ]" end, + Opt = fun (_F, A, A) -> ok; + ( F, A, _) -> io:format(F, [A]) + end, case Format of minimal -> io:format("~s~n", [Name]); - normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ - "w ~s~n", [Glyph, Name, Version]); + normal -> io:format("~s ~-" ++ integer_to_list(MaxWidth) ++ "w ", + [Glyph, Name]), + Opt("~s", Version, undefined), + io:format("~n"); verbose -> io:format("~s ~w~n", [Glyph, Name]), - io:format(" Version: \t~s~n", [Version]), - case Deps of - [] -> ok; - _ -> io:format(" Dependencies:\t~p~n", [Deps]) - end, - io:format(" Description:\t~s~n", [Description]), + Opt(" Version: \t~s~n", Version, undefined), + Opt(" Dependencies:\t~p~n", Deps, []), + Opt(" Description: \t~s~n", Description, undefined), io:format("~n") end. @@ -230,6 +241,9 @@ fmt_list(Header, Plugins) -> lists:flatten( [Header, $\n, [io_lib:format(" ~s~n", [P]) || P <- Plugins]]). +fmt_missing(Desc, Missing) -> + fmt_list("The following " ++ Desc ++ " could not be found:", Missing). + usort_plugins(Plugins) -> lists:usort(fun plugins_cmp/2, Plugins). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index f4c1f42b21..2717cc9217 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -22,11 +22,13 @@ -include("rabbit.hrl"). --import(rabbit_misc, [pget/2, pget/3]). +-import(rabbit_misc, [pget/2]). -export([register/0]). -export([name/1, get/2, set/1]). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). +-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1, + list_formatted/1, info_keys/0]). -rabbit_boot_step({?MODULE, [{description, "policy parameters"}, @@ -41,7 +43,7 @@ name(#amqqueue{policy = Policy}) -> name0(Policy); name(#exchange{policy = Policy}) -> name0(Policy). name0(undefined) -> none; -name0(Policy) -> pget(<<"name">>, Policy). +name0(Policy) -> pget(name, Policy). set(Q = #amqqueue{name = Name}) -> Q#amqqueue{policy = set0(Name)}; set(X = #exchange{name = Name}) -> X#exchange{policy = set0(Name)}. @@ -55,7 +57,7 @@ get(Name, EntityName = #resource{virtual_host = VHost}) -> get0(Name, match(EntityName, list(VHost))). get0(_Name, undefined) -> {error, not_found}; -get0(Name, List) -> case pget(<<"policy">>, List) of +get0(Name, List) -> case pget(definition, List) of undefined -> {error, not_found}; Policy -> case pget(Name, Policy) of undefined -> {error, not_found}; @@ -65,6 +67,81 @@ get0(Name, List) -> case pget(<<"policy">>, List) of %%---------------------------------------------------------------------------- +parse_set(VHost, Name, Pattern, Definition, undefined) -> + parse_set0(VHost, Name, Pattern, Definition, 0); +parse_set(VHost, Name, Pattern, Definition, Priority) -> + try list_to_integer(Priority) of + Num -> parse_set0(VHost, Name, Pattern, Definition, Num) + catch + error:badarg -> {error, "~p priority must be a number", [Priority]} + end. + +parse_set0(VHost, Name, Pattern, Defn, Priority) -> + case rabbit_misc:json_decode(Defn) of + {ok, JSON} -> + set0(VHost, Name, + [{<<"pattern">>, list_to_binary(Pattern)}, + {<<"definition">>, rabbit_misc:json_to_term(JSON)}, + {<<"priority">>, Priority}]); + error -> + {error_string, "JSON decoding error"} + end. + +set(VHost, Name, Pattern, Definition, Priority) -> + PolicyProps = [{<<"pattern">>, Pattern}, + {<<"definition">>, Definition}, + {<<"priority">>, case Priority of + undefined -> 0; + _ -> Priority + end}], + set0(VHost, Name, PolicyProps). + +set0(VHost, Name, Term) -> + rabbit_runtime_parameters:set_any(VHost, <<"policy">>, Name, Term). + +delete(VHost, Name) -> + rabbit_runtime_parameters:clear_any(VHost, <<"policy">>, Name). + +lookup(VHost, Name) -> + case rabbit_runtime_parameters:lookup(VHost, <<"policy">>, Name) of + not_found -> not_found; + P -> p(P, fun ident/1) + end. + +list() -> + list('_'). + +list(VHost) -> + list0(VHost, fun ident/1). + +list_formatted(VHost) -> + order_policies(list0(VHost, fun format/1)). + +list0(VHost, DefnFun) -> + [p(P, DefnFun) || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. + +order_policies(PropList) -> + lists:sort(fun (A, B) -> pget(priority, A) < pget(priority, B) end, + PropList). + +p(Parameter, DefnFun) -> + Value = pget(value, Parameter), + [{vhost, pget(vhost, Parameter)}, + {name, pget(name, Parameter)}, + {pattern, pget(<<"pattern">>, Value)}, + {definition, DefnFun(pget(<<"definition">>, Value))}, + {priority, pget(<<"priority">>, Value)}]. + +format(Term) -> + {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), + list_to_binary(JSON). + +ident(X) -> X. + +info_keys() -> [vhost, name, pattern, definition, priority]. + +%%---------------------------------------------------------------------------- + validate(_VHost, <<"policy">>, Name, Term) -> rabbit_parameter_validation:proplist( Name, policy_validation(), Term). @@ -80,10 +157,6 @@ notify_clear(VHost, <<"policy">>, _Name) -> %%---------------------------------------------------------------------------- -list(VHost) -> - [[{<<"name">>, pget(key, P)} | pget(value, P)] - || P <- rabbit_runtime_parameters:list(VHost, <<"policy">>)]. - update_policies(VHost) -> Policies = list(VHost), {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( @@ -127,13 +200,52 @@ match(Name, Policies) -> end. matches(#resource{name = Name}, Policy) -> - match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]). + match =:= re:run(Name, pget(pattern, Policy), [{capture, none}]). -sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0). +sort_pred(A, B) -> pget(priority, A) >= pget(priority, B). %%---------------------------------------------------------------------------- policy_validation() -> - [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, - {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. + [{<<"priority">>, fun rabbit_parameter_validation:number/2, mandatory}, + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"definition">>, fun validation/2, mandatory}]. + +validation(_Name, []) -> + {error, "no policy provided", []}; +validation(_Name, Terms) when is_list(Terms) -> + {Keys, Modules} = lists:unzip( + rabbit_registry:lookup_all(policy_validator)), + [] = dups(Keys), %% ASSERTION + Validators = lists:zipwith(fun (M, K) -> {M, a2b(K)} end, Modules, Keys), + {TermKeys, _} = lists:unzip(Terms), + case dups(TermKeys) of + [] -> validation0(Validators, Terms); + Dup -> {error, "~p duplicate keys not allowed", [Dup]} + end; +validation(_Name, Term) -> + {error, "parse error while reading policy: ~p", [Term]}. + +validation0(Validators, Terms) -> + case lists:foldl( + fun (Mod, {ok, TermsLeft}) -> + ModKeys = proplists:get_all_values(Mod, Validators), + case [T || {Key, _} = T <- TermsLeft, + lists:member(Key, ModKeys)] of + [] -> {ok, TermsLeft}; + Scope -> {Mod:validate_policy(Scope), TermsLeft -- Scope} + end; + (_, Acc) -> + Acc + end, {ok, Terms}, proplists:get_keys(Validators)) of + {ok, []} -> + ok; + {ok, Unvalidated} -> + {error, "~p are not recognised policy settings", [Unvalidated]}; + {Error, _} -> + Error + end. + +a2b(A) -> list_to_binary(atom_to_list(A)). + +dups(L) -> L -- lists:usort(L). diff --git a/src/rabbit_policy_validator.erl b/src/rabbit_policy_validator.erl new file mode 100644 index 0000000000..b59dec2b47 --- /dev/null +++ b/src/rabbit_policy_validator.erl @@ -0,0 +1,37 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_policy_validator). + +-ifdef(use_specs). + +-type(validate_results() :: + 'ok' | {error, string(), [term()]} | [validate_results()]). + +-callback validate_policy([{binary(), term()}]) -> validate_results(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {validate_policy, 1} + ]; +behaviour_info(_Other) -> + undefined. + +-endif. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fab5af61e6..e2cc4aeba1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -492,6 +492,14 @@ handle_exception(State, Channel, Reason) -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), throw({handshake_error, State#v1.connection_state, Channel, Reason}). +%% we've "lost sync" with the client and hence must not accept any +%% more input +fatal_frame_error(Error, Type, Channel, Payload, State) -> + frame_error(Error, Type, Channel, Payload, State), + %% grace period to allow transmission of error + timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw(fatal_frame_error). + frame_error(Error, Type, Channel, Payload, State) -> {Str, Bin} = payload_snippet(Payload), handle_exception(State, Channel, @@ -614,6 +622,17 @@ post_process_frame(_Frame, _ChPid, State) -> %%-------------------------------------------------------------------------- +%% We allow clients to exceed the frame size a little bit since quite +%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. +-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). + +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, + State = #v1{connection = #connection{frame_max = FrameMax}}) + when FrameMax /= 0 andalso + PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> + fatal_frame_error( + {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, + Type, Channel, <<>>, State); handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> ensure_stats_timer( switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -624,8 +643,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), switch_callback(State1, frame_header, 7); - _ -> frame_error({invalid_frame_end_marker, EndMarker}, - Type, Channel, Payload, State) + _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, + Type, Channel, Payload, State) end; %% The two rules pertaining to version negotiation: @@ -921,26 +940,27 @@ i(last_blocked_age, #v1{last_blocked_at = T}) -> timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); -i(protocol, #v1{connection = #connection{protocol = none}}) -> - none; -i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> - Protocol:version(); i(auth_mechanism, #v1{auth_mechanism = none}) -> none; i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> proplists:get_value(name, Mechanism:description()); -i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> - Username; -i(user, #v1{connection = #connection{user = none}}) -> +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); +i(user, #v1{connection = #connection{user = none}}) -> ''; -i(vhost, #v1{connection = #connection{vhost = VHost}}) -> +i(user, #v1{connection = #connection{user = #user{ + username = Username}}}) -> + Username; +i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; -i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> +i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; -i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> +i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; -i(client_properties, #v1{connection = #connection{ - client_properties = ClientProperties}}) -> +i(client_properties, #v1{connection = #connection{client_properties = + ClientProperties}}) -> ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index e14bbba018..32709d2484 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -107,7 +107,8 @@ sanity_check_module(ClassModule, Module) -> class_module(exchange) -> rabbit_exchange_type; class_module(auth_mechanism) -> rabbit_auth_mechanism; class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator. +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(policy_validator) -> rabbit_policy_validator. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index b58b459a7f..49060409e1 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). --export([parse_set/4, set/4, clear/3, - list/0, list/1, list_strict/1, list/2, list_strict/2, list_formatted/1, - lookup/3, value/3, value/4, info_keys/0]). +-export([parse_set/4, set/4, set_any/4, clear/3, clear_any/3, list/0, list/1, + list_strict/1, list/2, list_strict/2, list_formatted/1, lookup/3, + value/3, value/4, info_keys/0]). %%---------------------------------------------------------------------------- @@ -32,17 +32,23 @@ -> ok_or_error_string()). -spec(set/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> ok_or_error_string()). +-spec(set_any/4 :: (rabbit_types:vhost(), binary(), binary(), term()) + -> ok_or_error_string()). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) -> ok_or_error_string()). +-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) + -> ok_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). --spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). --spec(list_strict/1 :: (binary()) -> [rabbit_types:infos()] | 'not_found'). --spec(list/2 :: (rabbit_types:vhost(), binary()) -> [rabbit_types:infos()]). --spec(list_strict/2 :: (rabbit_types:vhost(), binary()) +-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]). +-spec(list_strict/1 :: (binary() | '_') + -> [rabbit_types:infos()] | 'not_found'). +-spec(list/2 :: (rabbit_types:vhost() | '_', binary() | '_') + -> [rabbit_types:infos()]). +-spec(list_strict/2 :: (rabbit_types:vhost() | '_', binary() | '_') -> [rabbit_types:infos()] | 'not_found'). -spec(list_formatted/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(lookup/3 :: (rabbit_types:vhost(), binary(), binary()) - -> rabbit_types:infos()). + -> rabbit_types:infos() | 'not_found'). -spec(value/3 :: (rabbit_types:vhost(), binary(), binary()) -> term()). -spec(value/4 :: (rabbit_types:vhost(), binary(), binary(), term()) -> term()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -57,29 +63,37 @@ %%--------------------------------------------------------------------------- -parse_set(VHost, Component, Key, String) -> +parse_set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; +parse_set(VHost, Component, Name, String) -> case rabbit_misc:json_decode(String) of - {ok, JSON} -> set(VHost, Component, Key, rabbit_misc:json_to_term(JSON)); + {ok, JSON} -> set(VHost, Component, Name, + rabbit_misc:json_to_term(JSON)); error -> {error_string, "JSON decoding error"} end. -set(VHost, Component, Key, Term) -> - case set0(VHost, Component, Key, Term) of - ok -> ok; - {errors, L} -> format_error(L) - end. +set(_, <<"policy">>, _, _) -> + {error_string, "policies may not be set using this method"}; +set(VHost, Component, Name, Term) -> + set_any(VHost, Component, Name, Term). format_error(L) -> {error_string, rabbit_misc:format_many([{"Validation failed~n", []} | L])}. -set0(VHost, Component, Key, Term) -> +set_any(VHost, Component, Name, Term) -> + case set_any0(VHost, Component, Name, Term) of + ok -> ok; + {errors, L} -> format_error(L) + end. + +set_any0(VHost, Component, Name, Term) -> case lookup_component(Component) of {ok, Mod} -> - case flatten_errors(Mod:validate(VHost, Component, Key, Term)) of + case flatten_errors(Mod:validate(VHost, Component, Name, Term)) of ok -> - case mnesia_update(VHost, Component, Key, Term) of + case mnesia_update(VHost, Component, Name, Term) of {old, Term} -> ok; - _ -> Mod:notify(VHost, Component, Key, Term) + _ -> Mod:notify(VHost, Component, Name, Term) end, ok; E -> @@ -89,43 +103,49 @@ set0(VHost, Component, Key, Term) -> E end. -mnesia_update(VHost, Component, Key, Term) -> +mnesia_update(VHost, Component, Name, Term) -> rabbit_misc:execute_mnesia_transaction( fun () -> - Res = case mnesia:read(?TABLE, {VHost, Component, Key}, read) of + Res = case mnesia:read(?TABLE, {VHost, Component, Name}, read) of [] -> new; [Params] -> {old, Params#runtime_parameters.value} end, - ok = mnesia:write(?TABLE, c(VHost, Component, Key, Term), write), + ok = mnesia:write(?TABLE, c(VHost, Component, Name, Term), write), Res end). -clear(VHost, Component, Key) -> - case clear0(VHost, Component, Key) of +clear(_, <<"policy">> , _) -> + {error_string, "policies may not be cleared using this method"}; +clear(VHost, Component, Name) -> + clear_any(VHost, Component, Name). + +clear_any(VHost, Component, Name) -> + case clear_any0(VHost, Component, Name) of ok -> ok; {errors, L} -> format_error(L) end. -clear0(VHost, Component, Key) -> +clear_any0(VHost, Component, Name) -> case lookup_component(Component) of {ok, Mod} -> case flatten_errors( - Mod:validate_clear(VHost, Component, Key)) of - ok -> mnesia_clear(VHost, Component, Key), - Mod:notify_clear(VHost, Component, Key), + Mod:validate_clear(VHost, Component, Name)) of + ok -> mnesia_clear(VHost, Component, Name), + Mod:notify_clear(VHost, Component, Name), ok; E -> E end; E -> E end. -mnesia_clear(VHost, Component, Key) -> +mnesia_clear(VHost, Component, Name) -> ok = rabbit_misc:execute_mnesia_transaction( fun () -> - ok = mnesia:delete(?TABLE, {VHost, Component, Key}, write) + ok = mnesia:delete(?TABLE, {VHost, Component, Name}, write) end). list() -> - [p(P) || P <- rabbit_misc:dirty_read_all(?TABLE)]. + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- + rabbit_misc:dirty_read_all(?TABLE), Comp /= <<"policy">>]. list(VHost) -> list(VHost, '_', []). list_strict(Component) -> list('_', Component, not_found). @@ -136,60 +156,63 @@ list(VHost, Component, Default) -> case component_good(Component) of true -> Match = #runtime_parameters{key = {VHost, Component, '_'}, _ = '_'}, - [p(P) || P <- mnesia:dirty_match_object(?TABLE, Match)]; + [p(P) || #runtime_parameters{ key = {_VHost, Comp, _Name}} = P <- + mnesia:dirty_match_object(?TABLE, Match), + Comp =/= <<"policy">> orelse + Component =:= <<"policy">>]; _ -> Default end. list_formatted(VHost) -> [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. -lookup(VHost, Component, Key) -> - case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of +lookup(VHost, Component, Name) -> + case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> p(Params) end. -value(VHost, Component, Key) -> - case lookup0(VHost, Component, Key, rabbit_misc:const(not_found)) of +value(VHost, Component, Name) -> + case lookup0(VHost, Component, Name, rabbit_misc:const(not_found)) of not_found -> not_found; Params -> Params#runtime_parameters.value end. -value(VHost, Component, Key, Default) -> - Params = lookup0(VHost, Component, Key, +value(VHost, Component, Name, Default) -> + Params = lookup0(VHost, Component, Name, fun () -> - lookup_missing(VHost, Component, Key, Default) + lookup_missing(VHost, Component, Name, Default) end), Params#runtime_parameters.value. -lookup0(VHost, Component, Key, DefaultFun) -> - case mnesia:dirty_read(?TABLE, {VHost, Component, Key}) of +lookup0(VHost, Component, Name, DefaultFun) -> + case mnesia:dirty_read(?TABLE, {VHost, Component, Name}) of [] -> DefaultFun(); [R] -> R end. -lookup_missing(VHost, Component, Key, Default) -> +lookup_missing(VHost, Component, Name, Default) -> rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read(?TABLE, {VHost, Component, Key}, read) of - [] -> Record = c(VHost, Component, Key, Default), + case mnesia:read(?TABLE, {VHost, Component, Name}, read) of + [] -> Record = c(VHost, Component, Name, Default), mnesia:write(?TABLE, Record, write), Record; [R] -> R end end). -c(VHost, Component, Key, Default) -> - #runtime_parameters{key = {VHost, Component, Key}, +c(VHost, Component, Name, Default) -> + #runtime_parameters{key = {VHost, Component, Name}, value = Default}. -p(#runtime_parameters{key = {VHost, Component, Key}, value = Value}) -> +p(#runtime_parameters{key = {VHost, Component, Name}, value = Value}) -> [{vhost, VHost}, {component, Component}, - {key, Key}, + {name, Name}, {value, Value}]. -info_keys() -> [component, key, value]. +info_keys() -> [component, name, value]. %%--------------------------------------------------------------------------- diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl index 5224ccaa36..d4d7271e90 100644 --- a/src/rabbit_runtime_parameters_test.erl +++ b/src/rabbit_runtime_parameters_test.erl @@ -16,9 +16,14 @@ -module(rabbit_runtime_parameters_test). -behaviour(rabbit_runtime_parameter). +-behaviour(rabbit_policy_validator). -export([validate/4, validate_clear/3, notify/4, notify_clear/3]). -export([register/0, unregister/0]). +-export([validate_policy/1]). +-export([register_policy_validator/0, unregister_policy_validator/0]). + +%---------------------------------------------------------------------------- register() -> rabbit_registry:register(runtime_parameter, <<"test">>, ?MODULE). @@ -36,3 +41,28 @@ validate_clear(_, <<"test">>, _) -> {error, "meh", []}. notify(_, _, _, _) -> ok. notify_clear(_, _, _) -> ok. + +%---------------------------------------------------------------------------- + +register_policy_validator() -> + rabbit_registry:register(policy_validator, <<"testeven">>, ?MODULE), + rabbit_registry:register(policy_validator, <<"testpos">>, ?MODULE). + +unregister_policy_validator() -> + rabbit_registry:unregister(policy_validator, <<"testeven">>), + rabbit_registry:unregister(policy_validator, <<"testpos">>). + +validate_policy([{<<"testeven">>, Terms}]) when is_list(Terms) -> + case length(Terms) rem 2 =:= 0 of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy([{<<"testpos">>, Terms}]) when is_list(Terms) -> + case lists:all(fun (N) -> is_integer(N) andalso N > 0 end, Terms) of + true -> ok; + false -> {error, "meh", []} + end; + +validate_policy(_) -> + {error, "meh", []}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e605485341..51ca6043b4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ all_tests() -> passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), + passed = test_rabbit_basic_header_handling(), passed = test_priority_queue(), passed = test_pg_local(), passed = test_unfold(), @@ -57,6 +58,7 @@ all_tests() -> passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), + passed = test_policy_validation(), passed = test_server_status(), passed = test_confirms(), passed = @@ -70,6 +72,7 @@ all_tests() -> passed = test_configurable_server_properties(), passed. + do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), @@ -158,6 +161,78 @@ test_multi_call() -> exit(Pid3, bang), passed. +test_rabbit_basic_header_handling() -> + passed = write_table_with_invalid_existing_type_test(), + passed = invalid_existing_headers_test(), + passed = disparate_invalid_header_entries_accumulate_separately_test(), + passed = corrupt_or_invalid_headers_are_overwritten_test(), + passed = invalid_same_header_entry_accumulation_test(), + passed. + +-define(XDEATH_TABLE, + [{<<"reason">>, longstr, <<"blah">>}, + {<<"queue">>, longstr, <<"foo.bar.baz">>}, + {<<"exchange">>, longstr, <<"my-exchange">>}, + {<<"routing-keys">>, array, []}]). + +-define(ROUTE_TABLE, [{<<"redelivered">>, bool, <<"true">>}]). + +-define(BAD_HEADER(K), {<<K>>, longstr, <<"bad ", K>>}). +-define(BAD_HEADER(K, Suf), {<<K>>, longstr, <<"bad ", K, Suf>>}). +-define(FOUND_BAD_HEADER(K), {<<K>>, array, [{longstr, <<"bad ", K>>}]}). + +write_table_with_invalid_existing_type_test() -> + prepend_check(<<"header1">>, ?XDEATH_TABLE, [?BAD_HEADER("header1")]), + passed. + +invalid_existing_headers_test() -> + Headers = + prepend_check(<<"header2">>, ?ROUTE_TABLE, [?BAD_HEADER("header2")]), + {array, [{table, ?ROUTE_TABLE}]} = + rabbit_misc:table_lookup(Headers, <<"header2">>), + passed. + +disparate_invalid_header_entries_accumulate_separately_test() -> + BadHeaders = [?BAD_HEADER("header2")], + Headers = prepend_check(<<"header2">>, ?ROUTE_TABLE, BadHeaders), + Headers2 = prepend_check(<<"header1">>, ?XDEATH_TABLE, + [?BAD_HEADER("header1") | Headers]), + {table, [?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("header2")]} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + passed. + +corrupt_or_invalid_headers_are_overwritten_test() -> + Headers0 = [?BAD_HEADER("header1"), + ?BAD_HEADER("x-invalid-headers")], + Headers1 = prepend_check(<<"header1">>, ?XDEATH_TABLE, Headers0), + {table,[?FOUND_BAD_HEADER("header1"), + ?FOUND_BAD_HEADER("x-invalid-headers")]} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + passed. + +invalid_same_header_entry_accumulation_test() -> + BadHeader1 = ?BAD_HEADER("header1", "a"), + Headers = prepend_check(<<"header1">>, ?ROUTE_TABLE, [BadHeader1]), + Headers2 = prepend_check(<<"header1">>, ?ROUTE_TABLE, + [?BAD_HEADER("header1", "b") | Headers]), + {table, InvalidHeaders} = + rabbit_misc:table_lookup(Headers2, ?INVALID_HEADERS_KEY), + {array, [{longstr,<<"bad header1b">>}, + {longstr,<<"bad header1a">>}]} = + rabbit_misc:table_lookup(InvalidHeaders, <<"header1">>), + passed. + +prepend_check(HeaderKey, HeaderTable, Headers) -> + Headers1 = rabbit_basic:prepend_table_header( + HeaderKey, HeaderTable, Headers), + {table, Invalid} = + rabbit_misc:table_lookup(Headers1, ?INVALID_HEADERS_KEY), + {Type, Value} = rabbit_misc:table_lookup(Headers, HeaderKey), + {array, [{Type, Value} | _]} = + rabbit_misc:table_lookup(Invalid, HeaderKey), + Headers1. + test_priority_queue() -> false = priority_queue:is_queue(not_a_queue), @@ -913,12 +988,12 @@ test_dynamic_mirroring() -> Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), %% Add two nodes and drop one Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), - %% Promote slave to master by policy - Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), %% Don't try to include nodes that are not running Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), %% If we can't find any of the nodes listed then just keep the master Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), + %% And once that's happened, still keep the master even when not listed + Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]), Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), @@ -1046,6 +1121,26 @@ test_runtime_parameters() -> rabbit_runtime_parameters_test:unregister(), passed. +test_policy_validation() -> + rabbit_runtime_parameters_test:register_policy_validator(), + SetPol = + fun (Key, Val) -> + control_action( + set_policy, + ["name", ".*", rabbit_misc:format("{\"~s\":~p}", [Key, Val])]) + end, + + ok = SetPol("testeven", []), + ok = SetPol("testeven", [1, 2]), + ok = SetPol("testeven", [1, 2, 3, 4]), + ok = SetPol("testpos", [2, 5, 5678]), + + {error_string, _} = SetPol("testpos", [-1, 0, 1]), + {error_string, _} = SetPol("testeven", [ 1, 2, 3]), + + rabbit_runtime_parameters_test:unregister_policy_validator(), + passed. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddb136a73d..8a3fd9d917 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,11 +17,11 @@ -module(rabbit_variable_queue). -export([init/3, terminate/2, delete_and_terminate/2, purge/1, - publish/4, publish_delivered/5, drain_confirmed/1, + publish/4, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, - is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). + is_duplicate/2, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -545,17 +545,8 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, ram_msg_count = RamMsgCount + 1, unconfirmed = UC1 })). -publish_delivered(false, #basic_message { id = MsgId }, - #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { async_callback = Callback, - len = 0 }) -> - case NeedsConfirming of - true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); - false -> ok - end, - {undefined, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, +publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, State = #vqstate { len = 0, @@ -579,6 +570,8 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +discard(_MsgId, _ChPid, State) -> State. + drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of true -> {[], State}; %% common case @@ -821,8 +814,6 @@ invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). is_duplicate(_Msg, State) -> {false, State}. -discard(_Msg, _ChPid, State) -> State. - %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -1325,12 +1316,9 @@ must_sync_index(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -blind_confirm(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). - msgs_written_to_disk(Callback, MsgIdSet, ignored) -> - blind_confirm(Callback, MsgIdSet); + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); msgs_written_to_disk(Callback, MsgIdSet, written) -> Callback(?MODULE, fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 03dfbe245d..297fa56fe3 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -97,6 +97,8 @@ internal_delete(VHostPath) -> proplists:get_value(component, Info), proplists:get_value(key, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], + [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info)) + || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. |
