summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl12
-rw-r--r--src/gm.erl10
-rw-r--r--src/rabbit_amqqueue.erl90
-rw-r--r--src/rabbit_amqqueue_process.erl45
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl32
-rw-r--r--src/rabbit_backing_queue_qc.erl8
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl75
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_mnesia.erl23
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_tests.erl77
-rw-r--r--src/rabbit_variable_queue.erl40
-rw-r--r--src/rabbit_vm.erl21
15 files changed, 271 insertions, 198 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index d595e4819e..9222c34c42 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -35,6 +35,10 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(call/2 ::
+ ( pid(), any()) -> any();
+ ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
+-spec(cast/2 :: (pid() | [pid()], any()) -> 'ok').
-endif.
@@ -96,6 +100,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
+call(PidOrPids, Msg) ->
+ invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+
+cast(PidOrPids, Msg) ->
+ invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+
%%----------------------------------------------------------------------------
group_pids_by_node(Pids) ->
diff --git a/src/gm.erl b/src/gm.erl
index 4a95de0dd1..2057b1f577 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -667,6 +667,9 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
+handle_info(timeout, State) ->
+ noreply(flush_broadcast_buffer(State));
+
handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
@@ -834,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) ->
noreply(State) ->
- {noreply, ensure_broadcast_timer(State), hibernate}.
+ {noreply, ensure_broadcast_timer(State), flush_timeout(State)}.
reply(Reply, State) ->
- {reply, Reply, ensure_broadcast_timer(State), hibernate}.
+ {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}.
+
+flush_timeout(#state{broadcast_buffer = []}) -> hibernate;
+flush_timeout(_) -> 0.
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7827b839d5..1b6cc223cf 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -284,7 +284,11 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(Q1, Q2) -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
+policy_changed(Q1, Q2) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2),
+ %% Make sure we emit a stats event even if nothing
+ %% mirroring-related has changed - the policy may have changed anyway.
+ wake_up(Q1).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -438,10 +442,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}) of
+ case delegate:call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -472,7 +476,7 @@ force_event_refresh(QNames) ->
wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
-consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -486,47 +490,51 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}).
+ delegate:call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+ delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
notify_down_all(QPids, ChPid) ->
- safe_delegate_call_ok(
- fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
- QPids).
+ {_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
+ case lists:filter(
+ fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
+ ({_Pid, _}) -> false
+ end, Bads) of
+ [] -> ok;
+ Bads1 -> {error, Bads1}
+ end.
limit_all(QPids, ChPid, Limiter) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end).
+ delegate:cast(QPids, {limit, ChPid, Limiter}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}).
+ delegate:call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ChPid,
+ delegate:call(QPid, {basic_consume, NoAck, ChPid,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -545,11 +553,9 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}).
+unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
-flush_all(QPids, ChPid) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -587,8 +593,8 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
+start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -650,10 +656,8 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% done with it.
MMsg = {deliver, Delivery, false, Flow},
SMsg = {deliver, Delivery, true, Flow},
- delegate:invoke_no_result(MPids,
- fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
- delegate:invoke_no_result(SPids,
- fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
+ delegate:cast(MPids, MMsg),
+ delegate:cast(SPids, SMsg),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
@@ -661,14 +665,8 @@ deliver(Qs, Delivery, _Flow) ->
%% see comment above
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:invoke(
- MPids, fun (QPid) ->
- ok = gen_server2:call(QPid, MMsg, infinity)
- end),
- {SRouted, _} = delegate:invoke(
- SPids, fun (QPid) ->
- ok = gen_server2:call(QPid, SMsg, infinity)
- end),
+ {MRouted, _} = delegate:call(MPids, MMsg),
+ {SRouted, _} = delegate:call(SPids, SMsg),
case MRouted ++ SRouted of
[] -> {unroutable, []};
R -> {routed, [QPid || {QPid, ok} <- R]}
@@ -680,23 +678,3 @@ qpids(Qs) ->
{[QPid | MPidAcc], [SPids | SPidAcc]}
end, {[], []}, Qs),
{MPids, lists:append(SPids)}.
-
-safe_delegate_call_ok(F, Pids) ->
- {_, Bads} = delegate:invoke(Pids, fun (Pid) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> F(Pid) end)
- end),
- case lists:filter(
- fun ({_Pid, {exit, {R, _}, _}}) -> rabbit_misc:is_abnormal_exit(R);
- ({_Pid, _}) -> false
- end, Bads) of
- [] -> ok;
- Bads1 -> {error, Bads1}
- end.
-
-delegate_call(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
-
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 74717acee5..03bcdf43f9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -283,21 +283,17 @@ terminate_shutdown(Fun, State) ->
end.
reply(Reply, NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {reply, Reply, NewState1, Timeout}.
+ {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
noreply(NewState) ->
- assert_invariant(NewState),
{NewState1, Timeout} = next_state(NewState),
- {noreply, NewState1, Timeout}.
+ {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = ensure_stats_timer(
- ensure_rate_timer(
- confirm_messages(MsgIds, State#q{
- backing_queue_state = BQS1}))),
+ State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -327,15 +323,11 @@ ensure_rate_timer(State = #q{rate_timer_ref = undefined}) ->
TRef = erlang:send_after(
?RAM_DURATION_UPDATE_INTERVAL, self(), update_ram_duration),
State#q{rate_timer_ref = TRef};
-ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
ensure_rate_timer(State) ->
State.
stop_rate_timer(State = #q{rate_timer_ref = undefined}) ->
State;
-stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
- State#q{rate_timer_ref = undefined};
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
erlang:cancel_timer(TRef),
State#q{rate_timer_ref = undefined}.
@@ -725,14 +717,15 @@ drop_expired_messages(State = #q{dlx = DLX,
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> {Next, Msgs, BQS2} =
+ BQ:fetchwhile(ExpirePred,
+ fun accumulate_msgs/4,
+ [], BQS),
case Msgs of
[] -> ok;
- _ -> (dead_letter_fun(expired))(Msgs)
+ _ -> (dead_letter_fun(expired))(
+ lists:reverse(Msgs))
end,
{Next, BQS2}
end,
@@ -741,6 +734,8 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
+accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -1324,10 +1319,10 @@ handle_info(drop_expired, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- {noreply, State1, Timeout} = noreply(State),
- %% Need to reset *after* we've been through noreply/1 so we do not
- %% just create another timer always and therefore never hibernate
- {noreply, rabbit_event:reset_stats_timer(State1, #q.stats_timer), Timeout};
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(rabbit_event:reset_stats_timer(
+ State, #q.stats_timer)),
+ {noreply, State1, Timeout};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
@@ -1351,8 +1346,10 @@ handle_info(update_ram_duration, State = #q{backing_queue = BQ,
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
- noreply(State#q{rate_timer_ref = just_measured,
- backing_queue_state = BQS2});
+ %% Don't call noreply/1, we don't want to set timers
+ {State1, Timeout} = next_state(State#q{rate_timer_ref = undefined,
+ backing_queue_state = BQS2}),
+ {noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
noreply(backing_queue_timeout(State#q{sync_timer_ref = undefined}));
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 7b9df81e78..919be3f3ee 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 96c58cb9da..272df5c1b7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,16 +124,25 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their Delivered
+%% flag and ack tag, to the supplied function. The function is also
+%% fed an accumulator. The result of fetchwhile is as for dropwhile
+%% plus the accumulator.
+-callback fetchwhile(msg_pred(),
+ fun ((rabbit_types:basic_message(), boolean(), ack(), A)
+ -> A),
+ A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
@@ -222,7 +231,8 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {purge, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a5d0a00855..5b3b8aa806 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -115,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) ->
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
{call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
@@ -147,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -182,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
-next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
@@ -262,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b1ef3b6bdf..a3c8286594 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -529,16 +529,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index c8a361b1e0..e3d967bc53 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -20,7 +20,7 @@
purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, foreach_ack/3]).
@@ -40,7 +40,6 @@
backing_queue_state,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
@@ -56,7 +55,6 @@
backing_queue_state :: any(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
@@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
backing_queue_state = BQS,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -187,13 +184,11 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -216,19 +211,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -264,34 +257,29 @@ fetch(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
- {Result, drop(MsgId, AckTag, State1)}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
drop(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:drop(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{Result, case Result of
- empty -> State1;
- {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
foreach_ack(MsgFun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }, AckTags) ->
@@ -408,7 +396,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
backing_queue_state = BQS1,
seen_status = SeenStatus,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -440,15 +427,21 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
- State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+ State.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81bb6769ab..4efde50ec5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([parse_arguments/3]).
@@ -191,6 +192,7 @@
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
-> boolean()).
+-spec(version_minor_equivalent/2 :: (string(), string()) -> boolean()).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
@@ -734,6 +736,16 @@ version_compare(A, B) ->
ANum > BNum -> gt
end.
+%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If
+%% versions do not match that pattern, just compare them.
+version_minor_equivalent(A, B) ->
+ {ok, RE} = re:compile("^(\\d+\\.\\d+)(\\.\\d+)\$"),
+ Opts = [{capture, all_but_first, list}],
+ case {re:run(A, RE, Opts), re:run(B, RE, Opts)} of
+ {{match, [A1|_]}, {match, [B1|_]}} -> A1 =:= B1;
+ _ -> A =:= B
+ end.
+
dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
dict_cons(Key, Value, Dict) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 942048f9b0..6a442fecf2 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -68,7 +68,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
@@ -757,9 +758,16 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
[node(), Node, Node])}}
end.
-check_version_consistency(This, Remote, _) when This =:= Remote ->
- ok;
check_version_consistency(This, Remote, Name) ->
+ check_version_consistency(This, Remote, Name, fun (A, B) -> A =:= B end).
+
+check_version_consistency(This, Remote, Name, Comp) ->
+ case Comp(This, Remote) of
+ true -> ok;
+ false -> version_error(Name, This, Remote)
+ end.
+
+version_error(Name, This, Remote) ->
{error, {inconsistent_cluster,
rabbit_misc:format("~s version mismatch: local node is ~s, "
"remote node ~s", [Name, This, Remote])}}.
@@ -767,8 +775,15 @@ check_version_consistency(This, Remote, Name) ->
check_otp_consistency(Remote) ->
check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+%% Unlike the rest of 3.0.x, 3.0.0 is not compatible. This can be
+%% removed after 3.1.0 is released.
+check_rabbit_consistency("3.0.0") ->
+ version_error("Rabbit", rabbit_misc:version(), "3.0.0");
+
check_rabbit_consistency(Remote) ->
- check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+ check_version_consistency(
+ rabbit_misc:version(), Remote, "Rabbit",
+ fun rabbit_misc:version_minor_equivalent/2).
%% This is fairly tricky. We want to know if the node is in the state
%% that a `reset' would leave it in. We cannot simply check if the
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8d0e4456df..258ac0ce4a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df8544a4ad..b499c59b30 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -38,6 +38,7 @@ all_tests() ->
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_version_equivalance(),
passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
@@ -141,6 +142,16 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed.
+test_version_equivalance() ->
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0"),
+ true = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.1"),
+ true = rabbit_misc:version_minor_equivalent("%%VSN%%", "%%VSN%%"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.1.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.0.1"),
+ false = rabbit_misc:version_minor_equivalent("3.0.0", "3.0.foo"),
+ passed.
+
test_multi_call() ->
Fun = fun() ->
receive
@@ -2307,8 +2318,9 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_drop/1,
fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropwhile/1,
+ fun test_dropfetchwhile/1,
fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
@@ -2409,41 +2421,70 @@ test_drop(VQ0) ->
true = rabbit_variable_queue:is_empty(VQ5),
VQ5.
-test_dropwhile(VQ0) ->
+test_dropfetchwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
false, Count,
- fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% fetch the first 5 messages
+ {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
+ rabbit_variable_queue:fetchwhile(
+ fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
+ fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) ->
+ {[Msg | MsgAcc], [AckTag | AckAcc]}
+ end, {[], []}, VQ1),
+ true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
+
+ %% requeue them
+ {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
-
- %% fetch five now
- VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _}, VQM} =
+ {#message_properties{expiry = 6}, VQ4} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (N, VQN) ->
+ {{Msg, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
+ true = msg2int(Msg) == N,
VQM
- end, VQ2, lists:seq(6, Count)),
+ end, VQ4, lists:seq(6, Count)),
%% should be empty now
- {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+ true = rabbit_variable_queue:is_empty(VQ5),
- VQ4.
+ VQ5.
test_dropwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun (_) -> false end, VQ1),
+ VQ2
+ end, VQ0).
+
+test_fetchwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
+ fun (_) -> false end,
+ fun (_, _, _, A) -> A end,
+ ok, VQ1),
+ VQ2
+ end, VQ0).
+
+test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ VQ3 = Fun(VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6 = Fun(VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 30ab96f58c..3e4c7c864f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, multiple_routing_keys/0, foreach_ack/3]).
@@ -577,27 +578,30 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _IsDelivered, AckTag}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
+ fetchwhile(Pred, Fun, Acc1, State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 53f3df18b3..db674f91d8 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -84,7 +84,15 @@ sup_memory(Sup) ->
sup_children(Sup) ->
rabbit_misc:with_exit_handler(
- rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+ rabbit_misc:const([]),
+ fun () ->
+ %% Just in case we end up talking to something that is
+ %% not a supervisor by mistake.
+ case supervisor:which_children(Sup) of
+ L when is_list(L) -> L;
+ _ -> []
+ end
+ end).
pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
{memory, M} -> M;
@@ -119,10 +127,13 @@ plugin_memory() ->
is_plugin(atom_to_list(App))]).
plugin_memory(App) ->
- case catch application_master:get_child(
- application_controller:get_master(App)) of
- {Pid, _} -> sup_memory(Pid);
- _ -> 0
+ case application_controller:get_master(App) of
+ undefined -> 0;
+ Master -> case application_master:get_child(Master) of
+ {Pid, _} when is_pid(Pid) -> sup_memory(Pid);
+ Pid when is_pid(Pid) -> sup_memory(Pid);
+ _ -> 0
+ end
end.
is_plugin("rabbitmq_" ++ _) -> true;