diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/delegate.erl | 12 | ||||
| -rw-r--r-- | src/gm.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 90 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 21 |
15 files changed, 271 insertions, 198 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index d595e4819e..9222c34c42 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -35,6 +35,10 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(call/2 :: + ( pid(), any()) -> any(); + ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). +-spec(cast/2 :: (pid() | [pid()], any()) -> 'ok'). -endif. @@ -96,6 +100,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> safe_invoke(LocalPids, Fun), %% must not die ok. +call(PidOrPids, Msg) -> + invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + +cast(PidOrPids, Msg) -> + invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + %%---------------------------------------------------------------------------- group_pids_by_node(Pids) -> diff --git a/src/gm.erl b/src/gm.erl index 4a95de0dd1..2057b1f577 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -667,6 +667,9 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); +handle_info(timeout, State) -> + noreply(flush_broadcast_buffer(State)); + handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, @@ -834,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), hibernate}. + {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), hibernate}. + {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + +flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(_) -> 0. ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7827b839d5..1b6cc223cf 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -284,7 +284,11 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2), + %% Make sure we emit a stats event even if nothing + %% mirroring-related has changed - the policy may have changed anyway. + wake_up(Q1). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -438,10 +442,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info). +info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}) of + case delegate:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -472,7 +476,7 @@ force_event_refresh(QNames) -> wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). -consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). +consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -486,47 +490,51 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). delete_immediately(QPids) -> [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}). + delegate:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). + delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). notify_down_all(QPids, ChPid) -> - safe_delegate_call_ok( - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, - QPids). + {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), + case lists:filter( + fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R); + ({_Pid, _}) -> false + end, Bads) of + [] -> ok; + Bads1 -> {error, Bads1} + end. limit_all(QPids, ChPid, Limiter) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). + delegate:cast(QPids, {limit, ChPid, Limiter}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}). + delegate:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate_call(QPid, {basic_consume, NoAck, ChPid, + delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, @@ -545,11 +553,9 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). +unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). -flush_all(QPids, ChPid) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). +flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), @@ -587,8 +593,8 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). -stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). +start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -650,10 +656,8 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> %% done with it. MMsg = {deliver, Delivery, false, Flow}, SMsg = {deliver, Delivery, true, Flow}, - delegate:invoke_no_result(MPids, - fun (QPid) -> gen_server2:cast(QPid, MMsg) end), - delegate:invoke_no_result(SPids, - fun (QPid) -> gen_server2:cast(QPid, SMsg) end), + delegate:cast(MPids, MMsg), + delegate:cast(SPids, SMsg), {routed, QPids}; deliver(Qs, Delivery, _Flow) -> @@ -661,14 +665,8 @@ deliver(Qs, Delivery, _Flow) -> %% see comment above MMsg = {deliver, Delivery, false}, SMsg = {deliver, Delivery, true}, - {MRouted, _} = delegate:invoke( - MPids, fun (QPid) -> - ok = gen_server2:call(QPid, MMsg, infinity) - end), - {SRouted, _} = delegate:invoke( - SPids, fun (QPid) -> - ok = gen_server2:call(QPid, SMsg, infinity) - end), + {MRouted, _} = delegate:call(MPids, MMsg), + {SRouted, _} = delegate:call(SPids, SMsg), case MRouted ++ SRouted of [] -> {unroutable, []}; R -> {routed, [QPid || {QPid, ok} <- R]} @@ -680,23 +678,3 @@ qpids(Qs) -> {[QPid | MPidAcc], [SPids | SPidAcc]} end, {[], []}, Qs), {MPids, lists:append(SPids)}. - -safe_delegate_call_ok(F, Pids) -> - {_, Bads} = delegate:invoke(Pids, fun (Pid) -> - rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> F(Pid) end) - end), - case lists:filter( - fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R); - ({_Pid, _}) -> false - end, Bads) of - [] -> ok; - Bads1 -> {error, Bads1} - end. - -delegate_call(Pid, Msg) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). - -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 74717acee5..03bcdf43f9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -283,21 +283,17 @@ terminate_shutdown(Fun, State) -> end. reply(Reply, NewState) -> - assert_invariant(NewState), {NewState1, Timeout} = next_state(NewState), - {reply, Reply, NewState1, Timeout}. + {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. noreply(NewState) -> - assert_invariant(NewState), {NewState1, Timeout} = next_state(NewState), - {noreply, NewState1, Timeout}. + {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = ensure_stats_timer( - ensure_rate_timer( - confirm_messages(MsgIds, State#q{ - backing_queue_state = BQS1}))), + State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -327,15 +323,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> TRef = erlang:send_after( ?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration), State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> State. stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> State; -stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> erlang:cancel_timer(TRef), State#q{rate_timer_ref = undefined}. @@ -725,14 +717,15 @@ drop_expired_messages(State = #q{dlx = DLX, Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, BQS1} = case DLX of - undefined -> {Next, undefined, BQS2} = - BQ:dropwhile(ExpirePred, false, BQS), - {Next, BQS2}; - _ -> {Next, Msgs, BQS2} = - BQ:dropwhile(ExpirePred, true, BQS), + undefined -> BQ:dropwhile(ExpirePred, BQS); + _ -> {Next, Msgs, BQS2} = + BQ:fetchwhile(ExpirePred, + fun accumulate_msgs/4, + [], BQS), case Msgs of [] -> ok; - _ -> (dead_letter_fun(expired))(Msgs) + _ -> (dead_letter_fun(expired))( + lists:reverse(Msgs)) end, {Next, BQS2} end, @@ -741,6 +734,8 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). +accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. + ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) -> @@ -1324,10 +1319,10 @@ handle_info(drop_expired, State) -> handle_info(emit_stats, State) -> emit_stats(State), - {noreply, State1, Timeout} = noreply(State), - %% Need to reset *after* we've been through noreply/1 so we do not - %% just create another timer always and therefore never hibernate - {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout}; + %% Don't call noreply/1, we don't want to set timers + {State1, Timeout} = next_state(rabbit_event:reset_stats_timer( + State, #q.stats_timer)), + {noreply, State1, Timeout}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> @@ -1351,8 +1346,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ, DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - noreply(State#q{rate_timer_ref = just_measured, - backing_queue_state = BQS2}); + %% Don't call noreply/1, we don't want to set timers + {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined, + backing_queue_state = BQS2}), + {noreply, State1, Timeout}; handle_info(sync_timeout, State) -> noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined})); diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 7b9df81e78..919be3f3ee 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -49,7 +49,7 @@ -spec(hash_password/1 :: (rabbit_types:password()) -> rabbit_types:password_hash()). -spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok'). --spec(list_users/0 :: () -> rabbit_types:infos()). +-spec(list_users/0 :: () -> [rabbit_types:infos()]). -spec(user_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(lookup_user/1 :: (rabbit_types:username()) -> rabbit_types:ok(rabbit_types:internal_user()) @@ -58,14 +58,14 @@ regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) -> 'ok'). --spec(list_permissions/0 :: () -> rabbit_types:infos()). +-spec(list_permissions/0 :: () -> [rabbit_types:infos()]). -spec(list_vhost_permissions/1 :: - (rabbit_types:vhost()) -> rabbit_types:infos()). + (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(list_user_permissions/1 :: - (rabbit_types:username()) -> rabbit_types:infos()). + (rabbit_types:username()) -> [rabbit_types:infos()]). -spec(list_user_vhost_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) - -> rabbit_types:infos()). + -> [rabbit_types:infos()]). -spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 96c58cb9da..272df5c1b7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -124,16 +124,25 @@ %% be ignored. -callback drain_confirmed(state()) -> {msg_ids(), state()}. -%% Drop messages from the head of the queue while the supplied predicate returns -%% true. Also accepts a boolean parameter that determines whether the messages -%% necessitate an ack or not. If they do, the function returns a list of -%% messages with the respective acktags. --callback dropwhile(msg_pred(), true, state()) - -> {rabbit_types:message_properties() | undefined, - [{rabbit_types:basic_message(), ack()}], state()}; - (msg_pred(), false, state()) - -> {rabbit_types:message_properties() | undefined, - undefined, state()}. +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predictate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their Delivered +%% flag and ack tag, to the supplied function. The function is also +%% fed an accumulator. The result of fetchwhile is as for dropwhile +%% plus the accumulator. +-callback fetchwhile(msg_pred(), + fun ((rabbit_types:basic_message(), boolean(), ack(), A) + -> A), + A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; @@ -222,7 +231,8 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index a5d0a00855..5b3b8aa806 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -115,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) -> #message_properties{needs_confirming = frequency([{1, true}, {20, false}]), expiry = oneof([undefined | lists:seq(1, 10)])}, - self(), BQ]}. + false, self(), BQ]}. qc_publish_multiple(#state{}) -> {call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}. @@ -147,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -182,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> %% Model updates -next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> +next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) -> #state{len = Len, messages = Messages, confirms = Confirms, @@ -262,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> S#state{bqstate = BQ1}; next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> - BQ = {call, erlang, element, [3, Res]}, + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b1ef3b6bdf..a3c8286594 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -529,16 +529,12 @@ check_not_default_exchange(_) -> %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% -%% One, quite reasonable, interpretation of the spec, taken by the -%% QPid M1 Java client, is that the exclusion of "amq." prefixed names +%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names %% only applies on actual creation, and not in the cases where the -%% entity already exists. This is how we use this function in the code -%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly -%% 0-9SP1, making it illegal to attempt to declare an exchange/queue -%% with an amq.* name when passive=false. So this will need -%% revisiting. +%% entity already exists or passive=true. %% -%% TODO: enforce other constraints on name. See AMQP JIRA 69. +%% NB: We deliberately do not enforce the other constraints on names +%% required by the spec. check_name(Kind, NameBin = <<"amq.", _/binary>>) -> rabbit_misc:protocol_error( access_refused, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index c8a361b1e0..e3d967bc53 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -20,7 +20,7 @@ purge/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, - dropwhile/3, set_ram_duration_target/2, ram_duration/1, + dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, foreach_ack/3]). @@ -40,7 +40,6 @@ backing_queue_state, seen_status, confirmed, - ack_msg_id, known_senders }). @@ -56,7 +55,6 @@ backing_queue_state :: any(), seen_status :: dict(), confirmed :: [rabbit_guid:guid()], - ack_msg_id :: dict(), known_senders :: set() }). @@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> backing_queue_state = BQS, seen_status = dict:new(), confirmed = [], - ack_msg_id = dict:new(), known_senders = sets:new() }. stop_mirroring(State = #state { coordinator = CPid, @@ -187,13 +184,11 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> + backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), - AM1 = maybe_store_acktag(AckTag, MsgId, AM), - State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }, + State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. discard(MsgId, ChPid, State = #state { gm = GM, @@ -216,19 +211,17 @@ discard(MsgId, ChPid, State = #state { gm = GM, State end. -dropwhile(Pred, AckRequired, - State = #state{gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +dropwhile(Pred, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), - Len1 = BQ:len(BQS1), - Dropped = Len - Len1, - case Dropped of - 0 -> ok; - _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}) - end, - {Next, Msgs, State #state { backing_queue_state = BQS1 } }. + {Next, BQS1} = BQ:dropwhile(Pred, BQS), + {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}. + +fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS), + {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -264,34 +257,29 @@ fetch(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, - case Result of - empty -> - {Result, State1}; - {#basic_message{id = MsgId}, _IsDelivered, AckTag} -> - {Result, drop(MsgId, AckTag, State1)} - end. + {Result, case Result of + empty -> State1; + {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1) + end}. drop(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:drop(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of - empty -> State1; - {MsgId, AckTag} -> drop(MsgId, AckTag, State1) + empty -> State1; + {_MsgId, AckTag} -> drop_one(AckTag, State1) end}. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> + backing_queue_state = BQS }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), - {MsgIds, State #state { backing_queue_state = BQS1, - ack_msg_id = AM1 }}. + {MsgIds, State #state { backing_queue_state = BQS1 }}. foreach_ack(MsgFun, State = #state { backing_queue = BQ, backing_queue_state = BQS }, AckTags) -> @@ -408,7 +396,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> backing_queue_state = BQS1, seen_status = SeenStatus, confirmed = [], - ack_msg_id = dict:new(), known_senders = sets:from_list(KS) }. sender_death_fun() -> @@ -440,15 +427,21 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop(MsgId, AckTag, State = #state { ack_msg_id = AM, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> +drop_one(AckTag, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), - State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. + State. -maybe_store_acktag(undefined, _MsgId, AM) -> AM; -maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). +drop(PrevLen, AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + Len = BQ:len(BQS), + case PrevLen - Len of + 0 -> State; + Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}), + State + end. ensure_monitoring(ChPid, State = #state { coordinator = CPid, known_senders = KS }) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81bb6769ab..4efde50ec5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([version_minor_equivalent/2]). -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([parse_arguments/3]). @@ -191,6 +192,7 @@ -spec(version_compare/3 :: (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(version_minor_equivalent/2 :: (string(), string()) -> boolean()). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). @@ -734,6 +736,16 @@ version_compare(A, B) -> ANum > BNum -> gt end. +%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If +%% versions do not match that pattern, just compare them. +version_minor_equivalent(A, B) -> + {ok, RE} = re:compile("^(\\d+\\.\\d+)(\\.\\d+)\$"), + Opts = [{capture, all_but_first, list}], + case {re:run(A, RE, Opts), re:run(B, RE, Opts)} of + {{match, [A1|_]}, {match, [B1|_]}} -> A1 =:= B1; + _ -> A =:= B + end. + dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A). dict_cons(Key, Value, Dict) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 942048f9b0..6a442fecf2 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -68,7 +68,8 @@ %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). + {'running_nodes', [node()]} | + {'partitions', [{node(), [node()]}]}]). -spec(is_clustered/0 :: () -> boolean()). -spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). -spec(node_type/0 :: () -> node_type()). @@ -757,9 +758,16 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> [node(), Node, Node])}} end. -check_version_consistency(This, Remote, _) when This =:= Remote -> - ok; check_version_consistency(This, Remote, Name) -> + check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end). + +check_version_consistency(This, Remote, Name, Comp) -> + case Comp(This, Remote) of + true -> ok; + false -> version_error(Name, This, Remote) + end. + +version_error(Name, This, Remote) -> {error, {inconsistent_cluster, rabbit_misc:format("~s version mismatch: local node is ~s, " "remote node ~s", [Name, This, Remote])}}. @@ -767,8 +775,15 @@ check_version_consistency(This, Remote, Name) -> check_otp_consistency(Remote) -> check_version_consistency(erlang:system_info(otp_release), Remote, "OTP"). +%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be +%% removed after 3.1.0 is released. +check_rabbit_consistency("3.0.0") -> + version_error("Rabbit", rabbit_misc:version(), "3.0.0"); + check_rabbit_consistency(Remote) -> - check_version_consistency(rabbit_misc:version(), Remote, "Rabbit"). + check_version_consistency( + rabbit_misc:version(), Remote, "Rabbit", + fun rabbit_misc:version_minor_equivalent/2). %% This is fairly tricky. We want to know if the node is in the state %% that a `reset' would leave it in. We cannot simply check if the diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8d0e4456df..258ac0ce4a 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -53,7 +53,7 @@ -spec(notify_joined_cluster/0 :: () -> 'ok'). -spec(notify_left_cluster/1 :: (node()) -> 'ok'). --spec(partitions/0 :: () -> {node(), [{atom(), node()}]}). +-spec(partitions/0 :: () -> {node(), [node()]}). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df8544a4ad..b499c59b30 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -38,6 +38,7 @@ all_tests() -> passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), + passed = test_version_equivalance(), passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), @@ -141,6 +142,16 @@ run_cluster_dependent_tests(SecondaryNode) -> passed. +test_version_equivalance() -> + true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"), + true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"), + true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"), + false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"), + passed. + test_multi_call() -> Fun = fun() -> receive @@ -2307,8 +2318,9 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_drop/1, fun test_variable_queue_fold_msg_on_disk/1, - fun test_dropwhile/1, + fun test_dropfetchwhile/1, fun test_dropwhile_varying_ram_duration/1, + fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], @@ -2409,41 +2421,70 @@ test_drop(VQ0) -> true = rabbit_variable_queue:is_empty(VQ5), VQ5. -test_dropwhile(VQ0) -> +test_dropfetchwhile(VQ0) -> Count = 10, %% add messages with sequential expiry VQ1 = variable_queue_publish( false, Count, - fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), + fun (N, Props) -> Props#message_properties{expiry = N} end, + fun erlang:term_to_binary/1, VQ0), + + %% fetch the first 5 messages + {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = + rabbit_variable_queue:fetchwhile( + fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, + fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + {[Msg | MsgAcc], [AckTag | AckAcc]} + end, {[], []}, VQ1), + true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], + + %% requeue them + {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2), %% drop the first 5 messages - {_, undefined, VQ2} = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, false, VQ1), - - %% fetch five now - VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _}, VQM} = + {#message_properties{expiry = 6}, VQ4} = + rabbit_variable_queue:dropwhile( + fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3), + + %% fetch 5 + VQ5 = lists:foldl(fun (N, VQN) -> + {{Msg, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), + true = msg2int(Msg) == N, VQM - end, VQ2, lists:seq(6, Count)), + end, VQ4, lists:seq(6, Count)), %% should be empty now - {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + true = rabbit_variable_queue:is_empty(VQ5), - VQ4. + VQ5. test_dropwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, VQ2} = rabbit_variable_queue:dropwhile( + fun (_) -> false end, VQ1), + VQ2 + end, VQ0). + +test_fetchwhile_varying_ram_duration(VQ0) -> + test_dropfetchwhile_varying_ram_duration( + fun (VQ1) -> + {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( + fun (_) -> false end, + fun (_, _, _, A) -> A end, + ok, VQ1), + VQ2 + end, VQ0). + +test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - {_, undefined, VQ3} = rabbit_variable_queue:dropwhile( - fun(_) -> false end, false, VQ2), + VQ3 = Fun(VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - {_, undefined, VQ6} = - rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6 = Fun(VQ5), VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 30ab96f58c..3e4c7c864f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, - dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, + dropwhile/2, fetchwhile/4, + fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]). @@ -577,27 +578,30 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). +dropwhile(Pred, State) -> + case queue_out(State) of + {empty, State1} -> + {undefined, a(State1)}; + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, State2); + false -> {MsgProps, a(in_r(MsgStatus, State1))} + end + end. -dropwhile(Pred, AckRequired, State, Msgs) -> - End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S}; - (Next, S) -> {Next, undefined, S} - end, +fetchwhile(Pred, Fun, Acc, State) -> case queue_out(State) of {empty, State1} -> - End(undefined, a(State1)); + {undefined, Acc, a(State1)}; {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), AckRequired} of - {true, true} -> - {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _IsDelivered, AckTag}, State3} = - internal_fetch(true, MsgStatus1, State2), - dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); - {true, false} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, AckRequired, State2, undefined); - {false, _} -> - End(MsgProps, a(in_r(MsgStatus, State1))) + case Pred(MsgProps) of + true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), + Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), + fetchwhile(Pred, Fun, Acc1, State3); + false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 53f3df18b3..db674f91d8 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -84,7 +84,15 @@ sup_memory(Sup) -> sup_children(Sup) -> rabbit_misc:with_exit_handler( - rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end). + rabbit_misc:const([]), + fun () -> + %% Just in case we end up talking to something that is + %% not a supervisor by mistake. + case supervisor:which_children(Sup) of + L when is_list(L) -> L; + _ -> [] + end + end). pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of {memory, M} -> M; @@ -119,10 +127,13 @@ plugin_memory() -> is_plugin(atom_to_list(App))]). plugin_memory(App) -> - case catch application_master:get_child( - application_controller:get_master(App)) of - {Pid, _} -> sup_memory(Pid); - _ -> 0 + case application_controller:get_master(App) of + undefined -> 0; + Master -> case application_master:get_child(Master) of + {Pid, _} when is_pid(Pid) -> sup_memory(Pid); + Pid when is_pid(Pid) -> sup_memory(Pid); + _ -> 0 + end end. is_plugin("rabbitmq_" ++ _) -> true; |
