summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-08 16:48:46 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-08 16:48:46 +0000
commit6c04eb980db74f45d1365f76eae6259f2bf17238 (patch)
tree7fbd1e638be66f54a8832e635be5ab1a242f3479
parentf649f7e82c2d08a57d7cf7efe8d04752bdbb4046 (diff)
parent3706818cb60bdb4372fb750d9ffd23ec93f32ecb (diff)
downloadrabbitmq-server-git-6c04eb980db74f45d1365f76eae6259f2bf17238.tar.gz
merge stable into default
-rw-r--r--docs/rabbitmqctl.1.xml30
-rw-r--r--include/rabbit.hrl6
-rw-r--r--src/credit_flow.erl75
-rw-r--r--src/delegate.erl12
-rw-r--r--src/gen_server2.erl92
-rw-r--r--src/pmon.erl6
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_amqqueue.erl52
-rw-r--r--src/rabbit_amqqueue_process.erl239
-rw-r--r--src/rabbit_auth_backend_internal.erl10
-rw-r--r--src/rabbit_backing_queue.erl65
-rw-r--r--src/rabbit_backing_queue_qc.erl112
-rw-r--r--src/rabbit_channel.erl477
-rw-r--r--src/rabbit_control_main.erl13
-rw-r--r--src/rabbit_exchange.erl65
-rw-r--r--src/rabbit_guid.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl199
-rw-r--r--src/rabbit_mirror_queue_slave.erl73
-rw-r--r--src/rabbit_mirror_queue_sync.erl232
-rw-r--r--src/rabbit_misc.erl44
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_node_monitor.erl2
-rw-r--r--src/rabbit_reader.erl203
-rw-r--r--src/rabbit_tests.erl168
-rw-r--r--src/rabbit_trace.erl37
-rw-r--r--src/rabbit_variable_queue.erl397
-rw-r--r--src/rabbit_vhost.erl2
27 files changed, 1627 insertions, 1014 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 34947b66e8..a95f7b3d4b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -446,6 +446,36 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>sync_queue</command> <arg choice="req">queue</arg></cmdsynopsis>
+ </term>
+ <listitem>
+ <variablelist>
+ <varlistentry>
+ <term>queue</term>
+ <listitem>
+ <para>
+ The name of the queue to synchronise.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ <para>
+ Instructs a mirrored queue with unsynchronised slaves to
+ synchronise itself. The queue will block while
+ synchronisation takes place (all publishers to and
+ consumers from the queue will block). The queue must be
+ mirrored, and must not have any pending unacknowledged
+ messages for this command to succeed.
+ </para>
+ <para>
+ Note that unsynchronised queues from which messages are
+ being drained will become synchronised eventually. This
+ command is primarily useful for queues which are not
+ being drained.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect2>
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b2832b456d..7385b4b3c0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -27,9 +27,6 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {protocol, user, timeout_sec, frame_max, vhost,
- client_properties, capabilities}).
-
-record(content,
{class_id,
properties, %% either 'none', or a decoded record/tuple
@@ -78,8 +75,7 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false,
- delivered = false}).
+-record(message_properties, {expiry, needs_confirming = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index ba99811f70..102c353f9b 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -52,6 +52,22 @@
%%----------------------------------------------------------------------------
+%% process dict update macro - eliminates the performance-hurting
+%% closure creation a HOF would introduce
+-define(UPDATE(Key, Default, Var, Expr),
+ begin
+ %% We deliberately allow Var to escape from the case here
+ %% to be used in Expr. Any temporary var we introduced
+ %% would also escape, and might conflict.
+ case get(Key) of
+ undefined -> Var = Default;
+ Var -> ok
+ end,
+ put(Key, Expr)
+ end).
+
+%%----------------------------------------------------------------------------
+
%% There are two "flows" here; of messages and of credit, going in
%% opposite directions. The variable names "From" and "To" refer to
%% the flow of credit, but the function names refer to the flow of
@@ -66,29 +82,33 @@
send(From) -> send(From, ?DEFAULT_CREDIT).
send(From, {InitialCredit, _MoreCreditAfter}) ->
- update({credit_from, From}, InitialCredit,
- fun (1) -> block(From),
- 0;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_from, From}, InitialCredit, C,
+ if C == 1 -> block(From),
+ 0;
+ true -> C - 1
+ end).
ack(To) -> ack(To, ?DEFAULT_CREDIT).
ack(To, {_InitialCredit, MoreCreditAfter}) ->
- update({credit_to, To}, MoreCreditAfter,
- fun (1) -> grant(To, MoreCreditAfter),
- MoreCreditAfter;
- (C) -> C - 1
- end).
+ ?UPDATE({credit_to, To}, MoreCreditAfter, C,
+ if C == 1 -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ true -> C - 1
+ end).
handle_bump_msg({From, MoreCredit}) ->
- update({credit_from, From}, 0,
- fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
- C + MoreCredit;
- (C) -> C + MoreCredit
- end).
-
-blocked() -> get(credit_blocked, []) =/= [].
+ ?UPDATE({credit_from, From}, 0, C,
+ if C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ true -> C + MoreCredit
+ end).
+
+blocked() -> case get(credit_blocked) of
+ undefined -> false;
+ [] -> false;
+ _ -> true
+ end.
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
@@ -105,24 +125,17 @@ grant(To, Quantity) ->
Msg = {bump_credit, {self(), Quantity}},
case blocked() of
false -> To ! Msg;
- true -> update(credit_deferred, [],
- fun (Deferred) -> [{To, Msg} | Deferred] end)
+ true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
- update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
case blocked() of
- false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
- erase(credit_deferred);
+ false -> case erase(credit_deferred) of
+ undefined -> ok;
+ Credits -> [To ! Msg || {To, Msg} <- Credits]
+ end;
true -> ok
end.
-
-get(Key, Default) ->
- case get(Key) of
- undefined -> Default;
- Value -> Value
- end.
-
-update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index 9222c34c42..96b8ba314c 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -62,6 +62,13 @@ invoke(Pid, Fun) when is_pid(Pid) ->
erlang:raise(Class, Reason, StackTrace)
end;
+invoke([], _Fun) -> %% optimisation
+ {[], []};
+invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ case safe_invoke(Pid, Fun) of
+ {ok, _, Result} -> {[{Pid, Result}], []};
+ {error, _, Error} -> {[], [{Pid, Error}]}
+ end;
invoke(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
%% The use of multi_call is only safe because the timeout is
@@ -90,6 +97,11 @@ invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
invoke_no_result([Pid], Fun);
+invoke_no_result([], _Fun) -> %% optimisation
+ ok;
+invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation
+ safe_invoke(Pid, Fun), %% must not die
+ ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 78bbbe0627..dc55948b36 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -196,8 +196,7 @@
%% State record
-record(gs2_state, {parent, name, state, mod, time,
- timeout_state, queue, debug, prioritise_call,
- prioritise_cast, prioritise_info}).
+ timeout_state, queue, debug, prioritisers}).
-ifdef(use_specs).
@@ -638,17 +637,17 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
in({'$gen_cast', Msg} = Input,
- GS2State = #gs2_state { prioritise_cast = PC }) ->
- in(Input, PC(Msg, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {_, F, _} }) ->
+ in(Input, F(Msg, GS2State), GS2State);
in({'$gen_call', From, Msg} = Input,
- GS2State = #gs2_state { prioritise_call = PC }) ->
- in(Input, PC(Msg, From, GS2State), GS2State);
+ GS2State = #gs2_state { prioritisers = {F, _, _} }) ->
+ in(Input, F(Msg, From, GS2State), GS2State);
in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
in(Input, infinity, GS2State);
in({system, _From, _Req} = Input, GS2State) ->
in(Input, infinity, GS2State);
-in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
- in(Input, PI(Input, GS2State), GS2State).
+in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) ->
+ in(Input, F(Input, GS2State), GS2State).
in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
@@ -864,13 +863,19 @@ dispatch(Info, Mod, State) ->
common_reply(_Name, From, Reply, _NState, [] = _Debug) ->
reply(From, Reply),
[];
-common_reply(Name, From, Reply, NState, Debug) ->
- reply(Name, From, Reply, NState, Debug).
+common_reply(Name, {To, Tag} = From, Reply, NState, Debug) ->
+ reply(From, Reply),
+ sys:handle_debug(Debug, fun print_event/3, Name, {out, Reply, To, NState}).
+
+common_noreply(_Name, _NState, [] = _Debug) ->
+ [];
+common_noreply(Name, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {noreply, NState}).
-common_debug([] = _Debug, _Func, _Info, _Event) ->
+common_become(_Name, _Mod, _NState, [] = _Debug) ->
[];
-common_debug(Debug, Func, Info, Event) ->
- sys:handle_debug(Debug, Func, Info, Event).
+common_become(Name, Mod, NState, Debug) ->
+ sys:handle_debug(Debug, fun print_event/3, Name, {become, Mod, NState}).
handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
state = State,
@@ -887,23 +892,11 @@ handle_msg({'$gen_call', From, Msg}, GS2State = #gs2_state { mod = Mod,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1});
- {noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = infinity,
- debug = Debug1});
- {noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state {state = NState,
- time = Time1,
- debug = Debug1});
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Msg,
GS2State #gs2_state { state = NState })),
- reply(Name, From, Reply, NState, Debug),
+ common_reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
handle_common_reply(Other, Msg, GS2State)
@@ -916,28 +909,24 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
debug = Debug}) ->
case Reply of
{noreply, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = infinity,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = infinity,
+ debug = Debug1});
{noreply, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {noreply, NState}),
- loop(GS2State #gs2_state { state = NState,
- time = Time1,
- debug = Debug1 });
+ Debug1 = common_noreply(Name, NState, Debug),
+ loop(GS2State #gs2_state {state = NState,
+ time = Time1,
+ debug = Debug1});
{become, Mod, NState} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
time = infinity,
debug = Debug1 }));
{become, Mod, NState, Time1} ->
- Debug1 = common_debug(Debug, fun print_event/3, Name,
- {become, Mod, NState}),
+ Debug1 = common_become(Name, Mod, NState, Debug),
loop(find_prioritisers(
GS2State #gs2_state { mod = Mod,
state = NState,
@@ -957,12 +946,6 @@ handle_common_termination(Reply, Msg, GS2State) ->
terminate({bad_return_value, Reply}, Msg, GS2State)
end.
-reply(Name, {To, Tag}, Reply, State, Debug) ->
- reply({To, Tag}, Reply),
- sys:handle_debug(
- Debug, fun print_event/3, Name, {out, Reply, To, State}).
-
-
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
@@ -1165,16 +1148,13 @@ whereis_name(Name) ->
end.
find_prioritisers(GS2State = #gs2_state { mod = Mod }) ->
- PrioriCall = function_exported_or_default(
- Mod, 'prioritise_call', 3,
- fun (_Msg, _From, _State) -> 0 end),
- PrioriCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
- fun (_Msg, _State) -> 0 end),
- PrioriInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
- fun (_Msg, _State) -> 0 end),
- GS2State #gs2_state { prioritise_call = PrioriCall,
- prioritise_cast = PrioriCast,
- prioritise_info = PrioriInfo }.
+ PCall = function_exported_or_default(Mod, 'prioritise_call', 3,
+ fun (_Msg, _From, _State) -> 0 end),
+ PCast = function_exported_or_default(Mod, 'prioritise_cast', 2,
+ fun (_Msg, _State) -> 0 end),
+ PInfo = function_exported_or_default(Mod, 'prioritise_info', 2,
+ fun (_Msg, _State) -> 0 end),
+ GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }.
function_exported_or_default(Mod, Fun, Arity, Default) ->
case erlang:function_exported(Mod, Fun, Arity) of
diff --git a/src/pmon.erl b/src/pmon.erl
index 1aeebb72bc..3789811923 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -19,6 +19,8 @@
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
monitored/1, is_empty/1]).
+-compile({no_auto_import, [monitor/2]}).
+
-ifdef(use_specs).
%%----------------------------------------------------------------------------
@@ -48,7 +50,9 @@ monitor(Item, M) ->
false -> dict:store(Item, erlang:monitor(process, Item), M)
end.
-monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
+monitor_all([], M) -> M; %% optimisation
+monitor_all([Item], M) -> monitor(Item, M); %% optimisation
+monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).
demonitor(Item, M) ->
case dict:find(Item, M) of
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c3a6d283b2..7b8348fc31 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -400,13 +400,11 @@ status() ->
is_running() -> is_running(node()).
-is_running(Node) ->
- rabbit_nodes:is_running(Node, rabbit).
+is_running(Node) -> rabbit_nodes:is_running(Node, rabbit).
environment() ->
- lists:keysort(
- 1, [P || P = {K, _} <- application:get_all_env(rabbit),
- K =/= default_pass]).
+ lists:keysort(1, [P || P = {K, _} <- application:get_all_env(rabbit),
+ K =/= default_pass]).
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 173f76481b..94150f1c69 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,10 +31,10 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
--export([start_mirroring/1, stop_mirroring/1]).
+-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1]).
%% internal
--export([internal_declare/2, internal_delete/2, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -156,11 +156,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/2 ::
- (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/1 ::
+ (name()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -173,6 +173,8 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
+-spec(sync_mirrors/1 :: (pid()) ->
+ 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
-endif.
@@ -257,7 +259,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName, QPid),
+ false -> TailFun = internal_delete(QueueName),
fun () -> TailFun(), ExistingQ end
end
end
@@ -302,6 +304,8 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup([]) -> []; %% optimisation
+lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation
lookup(Names) when is_list(Names) ->
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive for reasons explained in rabbit_misc:dirty_read/1.
@@ -380,14 +384,10 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
- rabbit_misc:assert_args_equivalence(
- Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
+ rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
+ [Key || {Key, _Fun} <- args()]).
check_declare_arguments(QueueName, Args) ->
- Checks = [{<<"x-expires">>, fun check_expires_arg/2},
- {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
- {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
- {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
TypeVal -> case Fun(TypeVal, Args) of
@@ -398,9 +398,15 @@ check_declare_arguments(QueueName, Args) ->
[Key, rabbit_misc:rs(QueueName),
Error])
end
- end || {Key, Fun} <- Checks],
+ end || {Key, Fun} <- args()],
ok.
+args() ->
+ [{<<"x-expires">>, fun check_expires_arg/2},
+ {<<"x-message-ttl">>, fun check_message_ttl_arg/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}].
+
check_string_arg({longstr, _}, _Args) -> ok;
check_string_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}.
@@ -569,7 +575,7 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName, QPid) ->
+internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
@@ -579,8 +585,7 @@ internal_delete(QueueName, QPid) ->
fun() ->
ok = T(),
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QueueName}])
+ [{name, QueueName}])
end
end
end).
@@ -597,10 +602,12 @@ set_maximum_since_use(QPid, Age) ->
start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
+sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
- qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ qlc:e(qlc:q([{QName, delete_queue(QName)} ||
#amqqueue{name = QName, pid = Pid,
slave_pids = []}
<- mnesia:table(rabbit_queue),
@@ -613,10 +620,9 @@ on_node_down(Node) ->
fun () ->
T(),
lists:foreach(
- fun({QName, QPid}) ->
+ fun(QName) ->
ok = rabbit_event:notify(queue_deleted,
- [{pid, QPid},
- {name, QName}])
+ [{name, QName}])
end, Qs)
end
end).
@@ -674,6 +680,8 @@ deliver(Qs, Delivery, _Flow) ->
R -> {routed, [QPid || {QPid, ok} <- R]}
end.
+qpids([]) -> {[], []}; %% optimisation
+qpids([#amqqueue{pid = QPid, slave_pids = SPids}]) -> {[QPid], SPids}; %% opt
qpids(Qs) ->
{MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
{MPidAcc, SPidAcc}) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ab735be6dd..4341c3d6d8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,7 +85,7 @@
%%----------------------------------------------------------------------------
-define(STATISTICS_KEYS,
- [pid,
+ [name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
@@ -101,16 +101,14 @@
]).
-define(CREATION_EVENT_KEYS,
- [pid,
- name,
+ [name,
durable,
auto_delete,
arguments,
owner_pid
]).
--define(INFO_KEYS,
- ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
@@ -122,26 +120,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
- State = #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = undefined,
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = pmon:new(),
- dlx = undefined,
- dlx_routing_key = undefined,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- delayed_stop = undefined,
- queue_monitors = pmon:new(),
- msg_id_to_channel = gb_trees:empty()},
- {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
+ {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -150,27 +129,28 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
+ State = init_state(Q),
+ State1 = State#q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ senders = Senders,
+ msg_id_to_channel = MTC},
+ State2 = process_args(State1),
+ lists:foldl(fun (Delivery, StateN) ->
+ deliver_or_enqueue(Delivery, true, StateN)
+ end, State2, Deliveries).
+
+init_state(Q) ->
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- senders = Senders,
+ senders = pmon:new(),
publish_seqno = 1,
unconfirmed = dtree:empty(),
- delayed_stop = undefined,
queue_monitors = pmon:new(),
- msg_id_to_channel = MTC},
- State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
- lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
- end, State1, Deliveries).
+ msg_id_to_channel = gb_trees:empty()},
+ rabbit_event:init_stats_timer(State, #q.stats_timer).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -182,7 +162,7 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
- rabbit_amqqueue:internal_delete(QName, self()),
+ rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
@@ -264,7 +244,7 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}).
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
@@ -278,7 +258,8 @@ terminate_shutdown(Fun, State) ->
case BQS of
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
- [emit_consumer_deleted(Ch, CTag)
+ QName = qname(State),
+ [emit_consumer_deleted(Ch, CTag, QName)
|| {Ch, CTag, _} <- consumers(State1)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -371,7 +352,7 @@ ch_record(ChPid) ->
undefined -> MonitorRef = erlang:monitor(process, ChPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
- acktags = sets:new(),
+ acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
is_limit_active = false,
@@ -385,9 +366,9 @@ ch_record(ChPid) ->
update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
unsent_message_count = UnsentMessageCount}) ->
- case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C);
- _ -> ok = store_ch_record(C)
+ case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
end,
C.
@@ -470,7 +451,7 @@ deliver_msg_to_consumer(DeliverFun,
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
+ true -> queue:in(AckTag, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -478,11 +459,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ drop_expired_msgs(State1),
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -518,25 +498,28 @@ send_or_record_confirm(#delivery{sender = SenderPid,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-discard(#delivery{sender = SenderPid, message = #basic_message{id = MsgId}},
- State) ->
- %% fake an 'eventual' confirm from BQ; noop if not needed
+discard(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message{id = MsgId}}, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- confirm_messages([MsgId], State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> confirm_messages([MsgId], State)
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(State),
+ drop_expired_msgs(State),
{_IsEmpty1, State2} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -558,15 +541,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -598,9 +581,9 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
-remove_consumers(ChPid, Queue) ->
+remove_consumers(ChPid, Queue, QName) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
- emit_consumer_deleted(ChPid, CTag),
+ emit_consumer_deleted(ChPid, CTag, QName),
false;
(_) ->
true
@@ -641,7 +624,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
- _ = remove_consumers(ChPid, Blocked), %% for stats emission
+ QName = qname(State),
+ _ = remove_consumers(ChPid, Blocked, QName), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -649,11 +633,12 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
+ ChPid, State#q.active_consumers,
+ QName),
senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
- false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
+ false -> {ok, requeue_and_run(queue:to_list(ChAckTags),
ensure_expiry_timer(State1))}
end
end.
@@ -692,15 +677,24 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
not_found ->
State;
C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
- ChAckTags, AckTags)}),
+ update_ch_record(
+ C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+subtract_acks([], [], AckQ) ->
+ AckQ;
+subtract_acks([], Prefix, AckQ) ->
+ queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
+subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+ case queue:out(AckQ) of
+ {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ end.
+
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
@@ -712,19 +706,22 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{dlx = DLX,
- backing_queue_state = BQS,
- backing_queue = BQ }) ->
+drop_expired_msgs(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
+ backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
{Props, BQS1} = case DLX of
- undefined -> {Next, undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- {Next, BQS2};
- _ -> {Next, Msgs, BQS2} =
- BQ:dropwhile(ExpirePred, true, BQS),
- DLXFun = dead_letter_fun(expired),
- DLXFun(Msgs),
+ undefined -> BQ:dropwhile(ExpirePred, BQS);
+ _ -> {Next, Msgs, BQS2} =
+ BQ:fetchwhile(ExpirePred,
+ fun accumulate_msgs/3,
+ [], BQS),
+ case Msgs of
+ [] -> ok;
+ _ -> (dead_letter_fun(expired))(
+ lists:reverse(Msgs))
+ end,
{Next, BQS2}
end,
ensure_ttl_timer(case Props of
@@ -732,6 +729,8 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -787,12 +786,9 @@ stop(State) -> stop(undefined, noreply, State).
stop(From, Reply, State = #q{unconfirmed = UC}) ->
case {dtree:is_empty(UC), Reply} of
- {true, noreply} ->
- {stop, normal, State};
- {true, _} ->
- {stop, normal, Reply, State};
- {false, _} ->
- noreply(State#q{delayed_stop = {From, Reply}})
+ {true, noreply} -> {stop, normal, State};
+ {true, _} -> {stop, normal, Reply, State};
+ {false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
@@ -900,7 +896,7 @@ i(exclusive_consumer_tag, #q{exclusive_consumer = {_ChPid, ConsumerTag}}) ->
i(messages_ready, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:len(BQS);
i(messages_unacknowledged, _) ->
- lists:sum([sets:size(C#cr.acktags) || C <- all_ch_record()]);
+ lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
@@ -946,19 +942,19 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
rabbit_event:notify(consumer_created,
[{consumer_tag, ConsumerTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
-emit_consumer_deleted(ChPid, ConsumerTag) ->
+emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},
- {queue, self()}]).
+ {queue, QName}]).
%%----------------------------------------------------------------------------
@@ -1049,27 +1045,26 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case fetch(AckRequired, drop_expired_messages(State1)) of
+ case fetch(AckRequired, drop_expired_msgs(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg},
- _From, State = #q{exclusive_consumer = ExistingHolder}) ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume,
- State) of
+ _From, State = #q{exclusive_consumer = Holder}) ->
+ case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
@@ -1078,7 +1073,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
+ true -> Holder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
@@ -1093,7 +1088,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ not NoAck, qname(State2)),
reply(ok, State2)
end;
@@ -1104,7 +1099,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
not_found ->
reply(ok, State);
C = #cr{blocked_consumers = Blocked} ->
- emit_consumer_deleted(ChPid, ConsumerTag),
+ emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
@@ -1114,7 +1109,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(From, ok, State1)
@@ -1123,16 +1118,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
handle_call(stat, _From, State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- drop_expired_messages(ensure_expiry_timer(State)),
+ drop_expired_msgs(ensure_expiry_timer(State)),
reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- IsEmpty = BQ:is_empty(BQS),
+ IsEmpty = BQ:is_empty(BQS),
IsUnused = is_unused(State),
if
- IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
- IfUnused and not(IsUnused) -> reply({error, in_use}, State);
+ IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State);
+ IfUnused and not(IsUnused) -> reply({error, in_use}, State);
true -> stop(From, {ok, BQ:len(BQS)}, State)
end;
@@ -1145,14 +1140,31 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(requeue(AckTags, ChPid, State));
+handle_call(sync_mirrors, _From,
+ State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ backing_queue_state = BQS}) ->
+ S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
+ case BQ:depth(BQS) - BQ:len(BQS) of
+ 0 -> case rabbit_mirror_queue_master:sync_mirrors(BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
+ end;
+ _ -> reply({error, pending_acks}, State)
+ end;
+
+handle_call(sync_mirrors, _From, State) ->
+ reply({error, not_mirrored}, State);
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ QName = qname(State),
case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
+ emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
reply(ok, State).
@@ -1200,8 +1212,9 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
- BQS, AckTags),
+ {ok, BQS1} = BQ:ackfold(
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
@@ -1310,7 +1323,7 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+ noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
handle_info(emit_stats, State) ->
emit_stats(State),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 7b9df81e78..919be3f3ee 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -49,7 +49,7 @@
-spec(hash_password/1 :: (rabbit_types:password())
-> rabbit_types:password_hash()).
-spec(set_tags/2 :: (rabbit_types:username(), [atom()]) -> 'ok').
--spec(list_users/0 :: () -> rabbit_types:infos()).
+-spec(list_users/0 :: () -> [rabbit_types:infos()]).
-spec(user_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(lookup_user/1 :: (rabbit_types:username())
-> rabbit_types:ok(rabbit_types:internal_user())
@@ -58,14 +58,14 @@
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
-> 'ok').
--spec(list_permissions/0 :: () -> rabbit_types:infos()).
+-spec(list_permissions/0 :: () -> [rabbit_types:infos()]).
-spec(list_vhost_permissions/1 ::
- (rabbit_types:vhost()) -> rabbit_types:infos()).
+ (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(list_user_permissions/1 ::
- (rabbit_types:username()) -> rabbit_types:infos()).
+ (rabbit_types:username()) -> [rabbit_types:infos()]).
-spec(list_user_vhost_permissions/2 ::
(rabbit_types:username(), rabbit_types:vhost())
- -> rabbit_types:infos()).
+ -> [rabbit_types:infos()]).
-spec(perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(vhost_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(user_perms_info_keys/0 :: () -> rabbit_types:info_keys()).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index af660c60a0..99b5946e59 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,17 +26,16 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
+-type(drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -78,8 +77,8 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -124,33 +123,50 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(),
+ A) -> {('stop' | 'cont'), A}),
+ A, state()) -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -210,9 +226,10 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index b37fbb29e2..5b3b8aa806 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -85,17 +85,19 @@ backing_queue_test(Cmds) ->
%% Commands
-%% Command frequencies are tuned so that queues are normally reasonably
-%% short, but they may sometimes exceed ?QUEUE_MAXLEN. Publish-multiple
-%% and purging cause extreme queue lengths, so these have lower probabilities.
-%% Fetches are sufficiently frequent so that commands that need acktags
-%% get decent coverage.
+%% Command frequencies are tuned so that queues are normally
+%% reasonably short, but they may sometimes exceed
+%% ?QUEUE_MAXLEN. Publish-multiple and purging cause extreme queue
+%% lengths, so these have lower probabilities. Fetches/drops are
+%% sufficiently frequent so that commands that need acktags get decent
+%% coverage.
command(S) ->
frequency([{10, qc_publish(S)},
{1, qc_publish_delivered(S)},
{1, qc_publish_multiple(S)}, %% very slow
- {15, qc_fetch(S)}, %% needed for ack and requeue
+ {9, qc_fetch(S)}, %% needed for ack and requeue
+ {6, qc_drop(S)}, %%
{15, qc_ack(S)},
{15, qc_requeue(S)},
{3, qc_set_ram_duration_target(S)},
@@ -104,7 +106,8 @@ command(S) ->
{1, qc_dropwhile(S)},
{1, qc_is_empty(S)},
{1, qc_timeout(S)},
- {1, qc_purge(S)}]).
+ {1, qc_purge(S)},
+ {1, qc_fold(S)}]).
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
@@ -112,7 +115,7 @@ qc_publish(#state{bqstate = BQ}) ->
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ false, self(), BQ]}.
qc_publish_multiple(#state{}) ->
{call, ?MODULE, publish_multiple, [resize(?QUEUE_MAXLEN, pos_integer())]}.
@@ -124,6 +127,9 @@ qc_publish_delivered(#state{bqstate = BQ}) ->
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
+qc_drop(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drop, [boolean(), BQ]}.
+
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
@@ -141,7 +147,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -152,6 +158,9 @@ qc_timeout(#state{bqstate = BQ}) ->
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
+qc_fold(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fold, [makefoldfun(pos_integer()), foldacc(), BQ]}.
+
%% Preconditions
%% Create long queues by only allowing publishing
@@ -173,7 +182,7 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
-next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Del, _Pid, _BQ]}) ->
#state{len = Len,
messages = Messages,
confirms = Confirms,
@@ -217,22 +226,10 @@ next_state(S, Res,
};
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
- #state{len = Len, messages = Messages, acks = Acks} = S,
- ResultInfo = {call, erlang, element, [1, Res]},
- BQ1 = {call, erlang, element, [2, Res]},
- AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = BQ1},
- case gb_trees:is_empty(Messages) of
- true -> S1;
- false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
- false ->
- S2
- end
- end;
+ next_state_fetch_and_drop(S, Res, AckReq, 3);
+
+next_state(S, Res, {call, ?BQMOD, drop, [AckReq, _BQ]}) ->
+ next_state_fetch_and_drop(S, Res, AckReq, 2);
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
@@ -265,7 +262,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [3, Res]},
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
@@ -278,19 +275,38 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()};
+
+next_state(S, Res, {call, ?BQMOD, fold, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1}.
%% Postconditions
postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
- {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
+ {{MsgFetched, _IsDelivered, AckTag}, _BQ} ->
{_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ Len =/= 0;
+ {empty, _BQ} ->
+ Len =:= 0
+ end;
+
+postcondition(S, {call, ?BQMOD, drop, _Args}, Res) ->
+ #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
+ case Res of
+ {{MsgIdFetched, AckTag}, _BQ} ->
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
+ MsgId = eval({call, erlang, element,
+ [?RECORD_INDEX(id, basic_message), Msg]}),
+ MsgIdFetched =:= MsgId andalso
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ Len =/= 0;
{empty, _BQ} ->
Len =:= 0
end;
@@ -313,6 +329,15 @@ postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
ReportedConfirmed);
+postcondition(S, {call, ?BQMOD, fold, [FoldFun, Acc0, _BQ0]}, {Res, _BQ1}) ->
+ #state{messages = Messages} = S,
+ {_, Model} = lists:foldl(fun ({_SeqId, {_MsgProps, _Msg}}, {stop, Acc}) ->
+ {stop, Acc};
+ ({_SeqId, {MsgProps, Msg}}, {cont, Acc}) ->
+ FoldFun(Msg, MsgProps, Acc)
+ end, {cont, Acc0}, gb_trees:to_list(Messages)),
+ true = Model =:= Res;
+
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
@@ -371,6 +396,15 @@ rand_choice(List, Selection, N) ->
rand_choice(List -- [Picked], [Picked | Selection],
N - 1).
+makefoldfun(Size) ->
+ fun (Msg, _MsgProps, Acc) ->
+ case length(Acc) > Size of
+ false -> {cont, [Msg | Acc]};
+ true -> {stop, Acc}
+ end
+ end.
+foldacc() -> [].
+
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
@@ -388,6 +422,24 @@ drop_messages(Messages) ->
end
end.
+next_state_fetch_and_drop(S, Res, AckReq, AckTagIdx) ->
+ #state{len = Len, messages = Messages, acks = Acks} = S,
+ ResultInfo = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ AckTag = {call, erlang, element, [AckTagIdx, ResultInfo]},
+ S1 = S#state{bqstate = BQ1},
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
+ end.
+
-else.
-export([prop_disabled/0]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b97af6d8ca..88e3dfc583 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,10 +33,10 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -64,6 +64,12 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,15 +191,13 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -312,9 +316,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -325,8 +332,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -334,9 +343,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -366,30 +379,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
-
-noreply(NewState) -> noreply([], NewState).
-
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -436,15 +430,13 @@ check_resource_access(User, Resource, Perm) ->
undefined -> [];
Other -> Other
end,
- CacheTail =
- case lists:member(V, Cache) of
- true -> lists:delete(V, Cache);
- false -> ok = rabbit_access_control:check_resource_access(
- User, Resource, Perm),
- lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
- end,
- put(permission_cache, [V | CacheTail]),
- ok.
+ case lists:member(V, Cache) of
+ true -> ok;
+ false -> ok = rabbit_access_control:check_resource_access(
+ User, Resource, Perm),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
+ put(permission_cache, [V | CacheTail])
+ end.
clear_permission_cache() ->
erase(permission_cache),
@@ -523,16 +515,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
@@ -553,11 +541,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-record_confirm(undefined, _, State) ->
- State;
-record_confirm(MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State).
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -597,8 +580,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -612,7 +595,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -626,23 +609,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- {noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
- end};
+ DQ = {Delivery, QNames},
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
@@ -655,16 +637,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
- end};
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -677,7 +658,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -689,7 +670,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -728,10 +709,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -815,14 +797,12 @@ handle_method(#'basic.recover_async'{requeue = true},
limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_misc:with_exit_handler(
- OkFun, fun () ->
- rabbit_amqqueue:requeue(
- QPid, MsgIds, self())
- end)
- end, ok, UAMQL),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1040,34 +1020,34 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
- lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
-
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ lists:foreach(fun ({ack, A}) -> ack(A, State1);
+ ({Requeue, A}) -> reject(Requeue, A, Limiter)
+ end, lists:reverse(Acks)),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
+ tx = {_Msgs, Acks}}) ->
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1126,9 +1106,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1212,42 +1195,40 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
- end}.
-
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
+
+%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, Acked),
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
@@ -1255,40 +1236,61 @@ record_sent(ConsumerTag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {queue:to_list(Q), queue:new()};
+ {lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc],
+ case PrefixAcc of
+ [] -> QTail;
+ _ -> queue:join(
+ queue:from_list(lists:reverse(PrefixAcc)),
+ QTail)
+ end};
Multiple ->
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
- collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
+ collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
- Incs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
- end, [], Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- incr_stats(Incs, ack, State).
-
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+%% NB: Acked is in youngest-first order
+ack(Acked, State = #ch{queue_names = QNames}) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
+
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1298,15 +1300,17 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-fold_per_queue(_F, Acc, []) ->
- Acc;
-fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId], Acc);
-fold_per_queue(F, Acc, UAL) ->
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
+foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(F, Acc, T).
+ rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1329,60 +1333,88 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
+ ({_, _, _}, Acc) -> Acc + 1
end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
end.
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ msg_seq_no = undefined,
+ mandatory = false},
+ []}, State) -> %% optimisation
+ ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
- incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
+ State#ch.unconfirmed)};
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> record_confirms([{MsgSeqNo, XName}], State)
+ end.
send_nacks([], State) ->
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
@@ -1418,7 +1450,12 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
+
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1426,16 +1463,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1444,19 +1481,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1467,12 +1501,8 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure, State) ->
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [update_measures(Type, Key, Inc, Measure) ||
- {Type, Key, Inc} <- Incs];
- _ -> ok
- end.
+incr_stats(Incs, Measure) ->
+ [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
update_measures(Type, Key, Inc, Measure) ->
Measures = case get({Type, Key}) of
@@ -1489,24 +1519,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 2b7061e8d5..6ccd501159 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,7 +17,7 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, sync_queue/1]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -50,6 +50,7 @@
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
+ {sync_queue, [?VHOST_DEF]},
add_user,
delete_user,
@@ -280,6 +281,12 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
rpc_call(Node, rabbit_mnesia, forget_cluster_node,
[ClusterNode, RemoveWhenOffline]);
+action(sync_queue, Node, [Q], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ QName = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),
+ Inform("Synchronising ~s", [rabbit_misc:rs(QName)]),
+ rpc_call(Node, rabbit_control_main, sync_queue, [QName]);
+
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
wait_for_application(Node, PidFile, rabbit_and_plugins, Inform);
@@ -513,6 +520,10 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+sync_queue(Q) ->
+ rabbit_amqqueue:with(
+ Q, fun(#amqqueue{pid = QPid}) -> rabbit_amqqueue:sync_mirrors(QPid) end).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a205b23d0b..9339161f29 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -39,8 +39,7 @@
-spec(recover/0 :: () -> [name()]).
-spec(callback/4::
(rabbit_types:exchange(), fun_name(),
- fun((boolean()) -> non_neg_integer()) | atom(),
- [any()]) -> 'ok').
+ fun((boolean()) -> non_neg_integer()) | atom(), [any()]) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:exchange(), rabbit_types:exchange()) -> 'ok').
-spec(declare/6 ::
@@ -114,26 +113,19 @@ recover() ->
[XName || #exchange{name = XName} <- Xs].
callback(X = #exchange{type = XType}, Fun, Serial0, Args) ->
- Serial = fun (Bool) ->
- case Serial0 of
- _ when is_atom(Serial0) -> Serial0;
- _ -> Serial0(Bool)
- end
+ Serial = if is_function(Serial0) -> Serial0;
+ is_atom(Serial0) -> fun (_Bool) -> Serial0 end
end,
- [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args])
- || M <- decorators()],
+ [ok = apply(M, Fun, [Serial(M:serialise_events(X)) | Args]) ||
+ M <- decorators()],
Module = type_to_module(XType),
apply(Module, Fun, [Serial(Module:serialise_events()) | Args]).
policy_changed(X1, X2) -> callback(X1, policy_changed, none, [X1, X2]).
serialise_events(X = #exchange{type = Type}) ->
- case [Serialise || M <- decorators(),
- Serialise <- [M:serialise_events(X)],
- Serialise == true] of
- [] -> (type_to_module(Type)):serialise_events();
- _ -> true
- end.
+ lists:any(fun (M) -> M:serialise_events(X) end, decorators())
+ orelse (type_to_module(Type)):serialise_events().
serial(#exchange{name = XName} = X) ->
Serial = case serialise_events(X) of
@@ -318,22 +310,19 @@ route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
[rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
route(X = #exchange{name = XName}, Delivery) ->
- route1(Delivery, {queue:from_list([X]), XName, []}).
-
-route1(Delivery, {WorkList, SeenXs, QNames}) ->
- case queue:out(WorkList) of
- {empty, _WorkList} ->
- lists:usort(QNames);
- {{value, X = #exchange{type = Type}}, WorkList1} ->
- DstNames = process_alternate(
- X, ((type_to_module(Type)):route(X, Delivery))),
- route1(Delivery,
- lists:foldl(fun process_route/2, {WorkList1, SeenXs, QNames},
- DstNames))
- end.
+ route1(Delivery, {[X], XName, []}).
+
+route1(_, {[], _, QNames}) ->
+ lists:usort(QNames);
+route1(Delivery, {[X = #exchange{type = Type} | WorkList], SeenXs, QNames}) ->
+ DstNames = process_alternate(
+ X, ((type_to_module(Type)):route(X, Delivery))),
+ route1(Delivery,
+ lists:foldl(fun process_route/2, {WorkList, SeenXs, QNames},
+ DstNames)).
process_alternate(#exchange{arguments = []}, Results) -> %% optimisation
- Results;
+ Results;
process_alternate(#exchange{name = XName, arguments = Args}, []) ->
case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
undefined -> [];
@@ -347,23 +336,25 @@ process_route(#resource{kind = exchange} = XName,
Acc;
process_route(#resource{kind = exchange} = XName,
{WorkList, #resource{kind = exchange} = SeenX, QNames}) ->
- {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:from_list([SeenX, XName]), QNames};
+ {cons_if_present(XName, WorkList),
+ gb_sets:from_list([SeenX, XName]), QNames};
process_route(#resource{kind = exchange} = XName,
{WorkList, SeenXs, QNames} = Acc) ->
case gb_sets:is_element(XName, SeenXs) of
true -> Acc;
- false -> {case lookup(XName) of
- {ok, X} -> queue:in(X, WorkList);
- {error, not_found} -> WorkList
- end, gb_sets:add_element(XName, SeenXs), QNames}
+ false -> {cons_if_present(XName, WorkList),
+ gb_sets:add_element(XName, SeenXs), QNames}
end;
process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
+cons_if_present(XName, L) ->
+ case lookup(XName) of
+ {ok, X} -> [X | L];
+ {error, not_found} -> L
+ end.
+
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index cedbbdb380..8ee9ad5bc0 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -104,8 +104,6 @@ advance_blocks({B1, B2, B3, B4}, I) ->
B5 = erlang:phash2({B1, I}, 4294967296),
{{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
-blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
-
%% generate a GUID. This function should be used when performance is a
%% priority and predictability is not an issue. Otherwise use
%% gen_secure/0.
@@ -114,14 +112,15 @@ gen() ->
%% time we need a new guid we rotate them producing a new hash
%% with the aid of the counter. Look at the comments in
%% advance_blocks/2 for details.
- {BS, I} = case get(guid) of
- undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
- erlang:md5(term_to_binary(fresh())),
- {{B1,B2,B3,B4}, 0};
- {BS0, I0} -> advance_blocks(BS0, I0)
- end,
- put(guid, {BS, I}),
- blocks_to_binary(BS).
+ case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> = Res =
+ erlang:md5(term_to_binary(fresh())),
+ put(guid, {{B1, B2, B3, B4}, 0}),
+ Res;
+ {BS, I} -> {{B1, B2, B3, B4}, _} = S = advance_blocks(BS, I),
+ put(guid, S),
+ <<B1:32, B2:32, B3:32, B4:32>>
+ end.
%% generate a non-predictable GUID.
%%
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index df733546b4..0df7ea1ced 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,30 +17,30 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4, discard/3, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
- dropwhile/3, set_ram_duration_target/2, ram_duration/1,
+ purge/1, publish/5, publish_delivered/4,
+ discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, fold/3]).
+ status/1, invoke/3, is_duplicate/2]).
-export([start/1, stop/0]).
--export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+-export([promote_backing_queue_state/8, sender_death_fun/0, depth_fun/0]).
--export([init_with_existing_bq/3, stop_mirroring/1]).
+-export([init_with_existing_bq/3, stop_mirroring/1, sync_mirrors/1]).
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
--record(state, { gm,
+-record(state, { name,
+ gm,
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
- ack_msg_id,
known_senders
}).
@@ -50,25 +50,26 @@
-type(death_fun() :: fun ((pid()) -> 'ok')).
-type(depth_fun() :: fun (() -> 'ok')).
--type(master_state() :: #state { gm :: pid(),
+-type(master_state() :: #state { name :: rabbit_amqqueue:name(),
+ gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
- ack_msg_id :: dict(),
known_senders :: set()
}).
--spec(promote_backing_queue_state/7 ::
- (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
- master_state()).
+-spec(promote_backing_queue_state/8 ::
+ (rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()], dict(),
+ [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(depth_fun/0 :: () -> depth_fun()).
-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
master_state()).
-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
+-spec(sync_mirrors/1 :: (master_state()) ->
+ {'ok', master_state()} | {stop, any(), master_state()}).
-endif.
@@ -109,14 +110,13 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:new() }.
stop_mirroring(State = #state { coordinator = CPid,
@@ -126,6 +126,29 @@ stop_mirroring(State = #state { coordinator = CPid,
stop_all_slaves(shutdown, State),
{BQ, BQS}.
+sync_mirrors(State = #state { name = QName,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Log = fun (Fmt, Params) ->
+ rabbit_log:info("Synchronising ~s: " ++ Fmt ++ "~n",
+ [rabbit_misc:rs(QName) | Params])
+ end,
+ Log("~p messages to synchronise", [BQ:len(BQS)]),
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ Ref = make_ref(),
+ Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, Log, SPids),
+ gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
+ S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
+ case rabbit_mirror_queue_sync:master_go(Syncer, Ref, Log, BQ, BQS) of
+ {shutdown, R, BQS1} -> {stop, R, S(BQS1)};
+ {sync_died, R, BQS1} -> Log("~p", [R]),
+ {ok, S(BQS1)};
+ {already_synced, BQS1} -> {ok, S(BQS1)};
+ {ok, BQS1} -> Log("complete", []),
+ {ok, S(BQS1)}
+ end.
+
terminate({shutdown, dropped} = Reason,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -135,8 +158,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -147,20 +170,16 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
-
-stop_all_slaves(Reason, #state{gm = GM}) ->
- Info = gm:info(GM),
- Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
- node(Pid) =/= node()],
- MRefs = [erlang:monitor(process, S) || S <- Slaves],
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
+
+stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
+ {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
+ MRefs = [erlang:monitor(process, SPid) || SPid <- SPids],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
- QName = proplists:get_value(group_name, Info),
rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
@@ -174,30 +193,27 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- State1 = State #state { backing_queue_state = BQS1, ack_msg_id = AM1 },
+ State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
discard(MsgId, ChPid, State = #state { gm = GM,
@@ -220,22 +236,17 @@ discard(MsgId, ChPid, State = #state { gm = GM,
State
end.
-dropwhile(Pred, AckRequired,
- State = #state{gm = GM,
- backing_queue = BQ,
- set_delivered = SetDelivered,
- backing_queue_state = BQS }) ->
+dropwhile(Pred, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
- Len1 = BQ:len(BQS1),
- Dropped = Len - Len1,
- case Dropped of
- 0 -> ok;
- _ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
- end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, BQS1} = BQ:dropwhile(Pred, BQS),
+ {Next, drop(Len, false, State #state { backing_queue_state = BQS1 })}.
+
+fetchwhile(Pred, Fun, Acc, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ {Next, Acc1, BQS1} = BQ:fetchwhile(Pred, Fun, Acc, BQS),
+ {Next, Acc1, drop(Len, true, State #state { backing_queue_state = BQS1 })}.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -267,43 +278,33 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+fetch(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
- Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- SetDelivered1 = lists:max([0, SetDelivered - 1]),
- AM1 = maybe_store_acktag(AckTag, MsgId, AM),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1,
- ack_msg_id = AM1 }}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1)
+ end}.
+
+drop(AckRequired, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:drop(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ {Result, case Result of
+ empty -> State1;
+ {_MsgId, AckTag} -> drop_one(AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
+ backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {MsgIds, State #state { backing_queue_state = BQS1,
- ack_msg_id = AM1 }}.
-
-fold(MsgFun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS }, AckTags) ->
- State #state { backing_queue_state = BQ:fold(MsgFun, BQS, AckTags) }.
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -312,6 +313,16 @@ requeue(AckTags, State = #state { gm = GM,
ok = gm:broadcast(GM, {requeue, MsgIds}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
+ackfold(MsgFun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }, AckTags) ->
+ {Acc1, BQS1} = BQ:ackfold(MsgFun, Acc, BQS, AckTags),
+ {Acc1, State #state { backing_queue_state = BQS1 }}.
+
+fold(Fun, Acc, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Result, BQS1} = BQ:fold(Fun, Acc, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:len(BQS).
@@ -399,20 +410,19 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
Len = BQ:len(BQS1),
Depth = BQ:depth(BQS1),
true = Len == Depth, %% ASSERTION: everything must have been requeued
ok = gm:broadcast(GM, {depth, Depth}),
- #state { gm = GM,
+ #state { name = QName,
+ gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
- seen_status = SeenStatus,
+ seen_status = Seen,
confirmed = [],
- ack_msg_id = dict:new(),
known_senders = sets:from_list(KS) }.
sender_death_fun() ->
@@ -440,8 +450,25 @@ depth_fun() ->
end)
end.
-maybe_store_acktag(undefined, _MsgId, AM) -> AM;
-maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
+%% ---------------------------------------------------------------------------
+%% Helpers
+%% ---------------------------------------------------------------------------
+
+drop_one(AckTag, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
+ State.
+
+drop(PrevLen, AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ Len = BQ:len(BQS),
+ case PrevLen - Len of
+ 0 -> State;
+ Dropped -> ok = gm:broadcast(GM, {drop, Len, Dropped, AckRequired}),
+ State
+ end.
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 5fbda9c521..feddf45a83 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -28,7 +28,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2, prioritise_info/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
-export([joined/2, members_changed/3, handle_msg/3]).
@@ -222,6 +222,29 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
end,
noreply(maybe_enqueue_message(Delivery, State));
+handle_cast({sync_start, Ref, Syncer},
+ State = #state { depth_delta = DD,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
+ S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN} end,
+ %% [0] We can only sync when there are no pending acks
+ case rabbit_mirror_queue_sync:slave(
+ DD, Ref, TRef, Syncer, BQ, BQS,
+ fun (BQN, BQSN) ->
+ BQSN1 = update_ram_duration(BQN, BQSN),
+ TRefN = erlang:send_after(?RAM_DURATION_UPDATE_INTERVAL,
+ self(), update_ram_duration),
+ {TRefN, BQSN1}
+ end) of
+ denied -> noreply(State1);
+ {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {failed, Res} -> noreply(S(Res));
+ {stop, Reason, Res} -> {stop, Reason, S(Res)}
+ end;
+
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
@@ -232,17 +255,13 @@ handle_cast({set_ram_duration_target, Duration},
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
noreply(State #state { backing_queue_state = BQS1 }).
-handle_info(update_ram_duration,
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {RamDuration, BQS1} = BQ:ram_duration(BQS),
- DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
- BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+handle_info(update_ram_duration, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = update_ram_duration(BQ, BQS),
%% Don't call noreply/1, we don't want to set timers
{State1, Timeout} = next_state(State #state {
rate_timer_ref = undefined,
- backing_queue_state = BQS2 }),
+ backing_queue_state = BQS1 }),
{noreply, State1, Timeout};
handle_info(sync_timeout, State) ->
@@ -321,7 +340,6 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{gm, _Msg} -> 5;
- {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -332,6 +350,8 @@ prioritise_info(Msg, _State) ->
_ -> 0
end.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
@@ -359,6 +379,11 @@ handle_msg([_SPid], _From, process_death) ->
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
+handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
+ case lists:member(SPid, SPids) of
+ true -> gen_server2:cast(SPid, {sync_start, Ref, Syncer});
+ false -> ok
+ end;
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
@@ -530,7 +555,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, AckTags, SS, MPids),
+ QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -699,7 +724,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
@@ -723,8 +748,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
- BQ:fetch(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
@@ -733,21 +757,6 @@ process_instruction({drop, Length, Dropped, AckRequired},
true -> State1;
false -> update_delta(ToDrop - Dropped, State1)
end};
-process_instruction({fetch, AckRequired, MsgId, Remaining},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- _ when QLen =< Remaining andalso AckRequired ->
- State;
- _ when QLen =< Remaining ->
- update_delta(-1, State)
- end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -828,6 +837,12 @@ update_delta( DeltaChange, State = #state { depth_delta = Delta }) ->
true = DeltaChange =< 0, %% assertion: we cannot become 'less' sync'ed
set_delta(Delta + DeltaChange, State #state { depth_delta = undefined }).
+update_ram_duration(BQ, BQS) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQ:set_ram_duration_target(DesiredDuration, BQS1).
+
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
new file mode 100644
index 0000000000..508d46e966
--- /dev/null
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -0,0 +1,232 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_sync).
+
+-include("rabbit.hrl").
+
+-export([master_prepare/3, master_go/5, slave/7]).
+
+-define(SYNC_PROGRESS_INTERVAL, 1000000).
+
+%% There are three processes around, the master, the syncer and the
+%% slave(s). The syncer is an intermediary, linked to the master in
+%% order to make sure we do not mess with the master's credit flow or
+%% set of monitors.
+%%
+%% Interactions
+%% ------------
+%%
+%% '*' indicates repeating messages. All are standard Erlang messages
+%% except sync_start which is sent over GM to flush out any other
+%% messages that we might have sent that way already. (credit) is the
+%% usual credit_flow bump message every so often.
+%%
+%% Master Syncer Slave(s)
+%% sync_mirrors -> || ||
+%% (from channel) || -- (spawns) --> || ||
+%% || --------- sync_start (over GM) -------> ||
+%% || || <--- sync_ready ---- ||
+%% || || (or) ||
+%% || || <--- sync_deny ----- ||
+%% || <--- ready ---- || ||
+%% || <--- next* ---- || || }
+%% || ---- msg* ----> || || } loop
+%% || || ---- sync_msg* ----> || }
+%% || || <--- (credit)* ----- || }
+%% || <--- next ---- || ||
+%% || ---- done ----> || ||
+%% || || -- sync_complete --> ||
+%% || (Dies) ||
+
+-ifdef(use_specs).
+
+-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
+-type(bq() :: atom()).
+-type(bqs() :: any()).
+
+-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
+-spec(master_go/5 :: (pid(), reference(), log_fun(), bq(), bqs()) ->
+ {'already_synced', bqs()} | {'ok', bqs()} |
+ {'shutdown', any(), bqs()} |
+ {'sync_died', any(), bqs()}).
+-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
+ bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
+ 'denied' |
+ {'ok' | 'failed', {timer:tref(), bqs()}} |
+ {'stop', any(), {timer:tref(), bqs()}}).
+
+-endif.
+
+%% ---------------------------------------------------------------------------
+%% Master
+
+master_prepare(Ref, Log, SPids) ->
+ MPid = self(),
+ spawn_link(fun () -> syncer(Ref, Log, MPid, SPids) end).
+
+master_go(Syncer, Ref, Log, BQ, BQS) ->
+ Args = {Syncer, Ref, Log, rabbit_misc:get_parent()},
+ receive
+ {'EXIT', Syncer, normal} -> {already_synced, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS};
+ {ready, Syncer} -> master_go0(Args, BQ, BQS)
+ end.
+
+master_go0(Args, BQ, BQS) ->
+ case BQ:fold(fun (Msg, MsgProps, Acc) ->
+ master_send(Msg, MsgProps, Args, Acc)
+ end, {0, erlang:now()}, BQS) of
+ {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
+ {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
+ {_, BQS1} -> master_done(Args, BQS1)
+ end.
+
+master_send(Msg, MsgProps, {Syncer, Ref, Log, Parent}, {I, Last}) ->
+ T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ true -> Log("~p messages", [I]),
+ erlang:now();
+ false -> Last
+ end,
+ receive
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age)
+ after 0 ->
+ ok
+ end,
+ receive
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {cont, {I + 1, T}};
+ {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
+ {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
+ end.
+
+master_done({Syncer, Ref, _Log, Parent}, BQS) ->
+ receive
+ {next, Ref} -> unlink(Syncer),
+ Syncer ! {done, Ref},
+ receive {'EXIT', Syncer, _} -> ok
+ after 0 -> ok
+ end,
+ {ok, BQS};
+ {'EXIT', Parent, Reason} -> {shutdown, Reason, BQS};
+ {'EXIT', Syncer, Reason} -> {sync_died, Reason, BQS}
+ end.
+
+%% Master
+%% ---------------------------------------------------------------------------
+%% Syncer
+
+syncer(Ref, Log, MPid, SPids) ->
+ [erlang:monitor(process, SPid) || SPid <- SPids],
+ %% We wait for a reply from the slaves so that we know they are in
+ %% a receive block and will thus receive messages we send to them
+ %% *without* those messages ending up in their gen_server2 pqueue.
+ case [SPid || SPid <- SPids,
+ receive
+ {sync_ready, Ref, SPid} -> true;
+ {sync_deny, Ref, SPid} -> false;
+ {'DOWN', _, process, SPid, _} -> false
+ end] of
+ [] -> Log("all slaves already synced", []);
+ SPids1 -> MPid ! {ready, self()},
+ Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
+ syncer_loop(Ref, MPid, SPids1)
+ end.
+
+syncer_loop(Ref, MPid, SPids) ->
+ MPid ! {next, Ref},
+ receive
+ {msg, Ref, Msg, MsgProps} ->
+ SPids1 = wait_for_credit(SPids),
+ [begin
+ credit_flow:send(SPid),
+ SPid ! {sync_msg, Ref, Msg, MsgProps}
+ end || SPid <- SPids1],
+ syncer_loop(Ref, MPid, SPids1);
+ {done, Ref} ->
+ [SPid ! {sync_complete, Ref} || SPid <- SPids]
+ end.
+
+wait_for_credit(SPids) ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ wait_for_credit(SPids);
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ wait_for_credit(lists:delete(SPid, SPids))
+ end;
+ false -> SPids
+ end.
+
+%% Syncer
+%% ---------------------------------------------------------------------------
+%% Slave
+
+slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
+ Syncer ! {sync_deny, Ref, self()},
+ denied;
+
+slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
+ MRef = erlang:monitor(process, Syncer),
+ Syncer ! {sync_ready, Ref, self()},
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
+ rabbit_misc:get_parent()}, TRef, BQS1).
+
+slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
+ TRef, BQS) ->
+ receive
+ {'DOWN', MRef, process, Syncer, _Reason} ->
+ %% If the master dies half way we are not in the usual
+ %% half-synced state (with messages nearer the tail of the
+ %% queue); instead we have ones nearer the head. If we then
+ %% sync with a newly promoted master, or even just receive
+ %% messages from it, we have a hole in the middle. So the
+ %% only thing to do here is purge.
+ {_MsgCount, BQS1} = BQ:purge(BQS),
+ credit_flow:peer_down(Syncer),
+ {failed, {TRef, BQS1}};
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ slave_sync_loop(Args, TRef, BQS);
+ {sync_complete, Ref} ->
+ erlang:demonitor(MRef, [flush]),
+ credit_flow:peer_down(Syncer),
+ {ok, {TRef, BQS}};
+ {'$gen_cast', {set_maximum_since_use, Age}} ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ slave_sync_loop(Args, TRef, BQS);
+ {'$gen_cast', {set_ram_duration_target, Duration}} ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ slave_sync_loop(Args, TRef, BQS1);
+ update_ram_duration ->
+ {TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
+ slave_sync_loop(Args, TRef1, BQS1);
+ {sync_msg, Ref, Msg, Props} ->
+ credit_flow:ack(Syncer),
+ Props1 = Props#message_properties{needs_confirming = false},
+ BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
+ slave_sync_loop(Args, TRef, BQS1);
+ {'EXIT', Parent, Reason} ->
+ {stop, Reason, {TRef, BQS}};
+ %% If the master throws an exception
+ {'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
+ BQ:delete_and_terminate(Reason, BQS),
+ {stop, Reason, {TRef, undefined}}
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 4efde50ec5..ce3e380248 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,6 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
+-export([get_parent/0]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -241,7 +242,7 @@
-spec(interval_operation/4 ::
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-
+-spec(get_parent/0 :: () -> pid()).
-endif.
%%----------------------------------------------------------------------------
@@ -352,13 +353,12 @@ set_table_value(Table, Key, Type, Value) ->
sort_field_table(
lists:keystore(Key, 1, Table, {Key, Type, Value})).
-r(#resource{virtual_host = VHostPath}, Kind, Name)
- when is_binary(Name) ->
+r(#resource{virtual_host = VHostPath}, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
-r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
+r(VHostPath, Kind, Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name}.
-r(VHostPath, Kind) when is_binary(VHostPath) ->
+r(VHostPath, Kind) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
@@ -1046,3 +1046,37 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
{false, false} -> lists:max([IdealInterval,
round(LastInterval / 1.5)])
end}.
+
+%% -------------------------------------------------------------------------
+%% Begin copypasta from gen_server2.erl
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid (Parent) -> Parent;
+ [Parent | _] when is_atom(Parent) -> name_to_pid(Parent);
+ _ -> exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined -> case whereis_name(Name) of
+ undefined -> exit(could_not_find_registerd_name);
+ Pid -> Pid
+ end;
+ Pid -> Pid
+ end.
+
+whereis_name(Name) ->
+ case ets:lookup(global_names, Name) of
+ [{_Name, Pid, _Method, _RPid, _Ref}] ->
+ if node(Pid) == node() -> case erlang:is_process_alive(Pid) of
+ true -> Pid;
+ false -> undefined
+ end;
+ true -> Pid
+ end;
+ [] -> undefined
+ end.
+
+%% End copypasta from gen_server2.erl
+%% -------------------------------------------------------------------------
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 6576ba5210..6a442fecf2 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -68,7 +68,8 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
+ {'running_nodes', [node()]} |
+ {'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8d0e4456df..258ac0ce4a 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -53,7 +53,7 @@
-spec(notify_joined_cluster/0 :: () -> 'ok').
-spec(notify_left_cluster/1 :: (node()) -> 'ok').
--spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
+-spec(partitions/0 :: () -> {node(), [node()]}).
-endif.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 928786e983..13e8feff08 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -35,12 +35,16 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, name, connection, callback, recv_len, pending_recv,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at, host, peer_host,
- port, peer_port}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, throttle}).
+
+-record(connection, {name, host, peer_host, port, peer_port,
+ protocol, user, timeout_sec, frame_max, vhost,
+ client_properties, capabilities,
+ auth_mechanism, auth_state}).
+
+-record(throttle, {conserve_resources, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -205,15 +209,21 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
{PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = list_to_binary(Name),
connection = #connection{
+ name = list_to_binary(Name),
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
- capabilities = []},
+ capabilities = [],
+ auth_mechanism = none,
+ auth_state = none},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -224,15 +234,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
start_heartbeat_fun = StartHeartbeatFun,
buf = [],
buf_len = 0,
- auth_mechanism = none,
- auth_state = none,
- conserve_resources = false,
- last_blocked_by = none,
- last_blocked_at = never,
- host = Host,
- peer_host = PeerHost,
- port = Port,
- peer_port = PeerPort},
+ throttle = #throttle{
+ conserve_resources = false,
+ last_blocked_by = none,
+ last_blocked_at = never}},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -288,8 +293,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_resources, Conserve}, Deb, State) ->
- recvloop(Deb, control_throttle(State#v1{conserve_resources = Conserve}));
+handle_other({conserve_resources, Conserve}, Deb,
+ State = #v1{throttle = Throttle}) ->
+ Throttle1 = Throttle#throttle{conserve_resources = Conserve},
+ recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -372,29 +379,31 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-control_throttle(State = #v1{connection_state = CS,
- conserve_resources = Mem}) ->
- case {CS, Mem orelse credit_flow:blocked()} of
+control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
+ case {CS, (Throttle#throttle.conserve_resources orelse
+ credit_flow:blocked())} of
{running, true} -> State#v1{connection_state = blocking};
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
State#v1{connection_state = running};
- {blocked, true} -> update_last_blocked_by(State);
+ {blocked, true} -> State#v1{throttle = update_last_blocked_by(
+ Throttle)};
{_, _} -> State
end.
-maybe_block(State = #v1{connection_state = blocking}) ->
+maybe_block(State = #v1{connection_state = blocking, throttle = Throttle}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- update_last_blocked_by(State#v1{connection_state = blocked,
- last_blocked_at = erlang:now()});
+ State#v1{connection_state = blocked,
+ throttle = update_last_blocked_by(
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
-update_last_blocked_by(State = #v1{conserve_resources = true}) ->
- State#v1{last_blocked_by = resource};
-update_last_blocked_by(State = #v1{conserve_resources = false}) ->
- State#v1{last_blocked_by = flow}.
+update_last_blocked_by(Throttle = #throttle{conserve_resources = true}) ->
+ Throttle#throttle{last_blocked_by = resource};
+update_last_blocked_by(Throttle = #throttle{conserve_resources = false}) ->
+ Throttle#throttle{last_blocked_by = flow}.
%%--------------------------------------------------------------------------
%% error handling / termination
@@ -531,9 +540,10 @@ payload_snippet(<<Snippet:16/binary, _/binary>>) ->
%%--------------------------------------------------------------------------
create_channel(Channel, State) ->
- #v1{sock = Sock, name = Name, queue_collector = Collector,
+ #v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
+ connection = #connection{name = Name,
+ protocol = Protocol,
frame_max = FrameMax,
user = User,
vhost = VHost,
@@ -594,40 +604,36 @@ handle_frame(Type, Channel, Payload, State) ->
unexpected_frame(Type, Channel, Payload, State).
process_frame(Frame, Channel, State) ->
- {ChPid, AState} = case get({channel, Channel}) of
+ ChKey = {channel, Channel},
+ {ChPid, AState} = case get(ChKey) of
undefined -> create_channel(Channel, State);
Other -> Other
end,
- case process_channel_frame(Frame, ChPid, AState) of
- {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {error, Reason} -> handle_exception(State, Channel, Reason)
- end.
-
-process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- control_throttle(State);
-post_process_frame({method, MethodName, _}, _ChPid,
- State = #v1{connection = #connection{
- protocol = Protocol}}) ->
- case Protocol:method_has_content(MethodName) of
- true -> erlang:bump_reductions(2000),
- maybe_block(control_throttle(State));
- false -> control_throttle(State)
- end;
+ State;
+post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
+ maybe_block(State);
+post_process_frame({content_body, _}, _ChPid, State) ->
+ maybe_block(State);
post_process_frame(_Frame, _ChPid, State) ->
- control_throttle(State).
+ State.
%%--------------------------------------------------------------------------
@@ -746,13 +752,13 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
- State = State0#v1{auth_mechanism = AuthMechanism,
- auth_state = AuthMechanism:init(Sock),
- connection_state = securing,
+ State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
- capabilities = Capabilities}},
+ capabilities = Capabilities,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -790,10 +796,11 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
- connection = Connection = #connection{
- user = User,
- protocol = Protocol},
- sock = Sock}) ->
+ connection = Connection = #connection{
+ user = User,
+ protocol = Protocol},
+ sock = Sock,
+ throttle = Throttle}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -801,7 +808,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
State1 = control_throttle(
State#v1{connection_state = running,
connection = NewConnection,
- conserve_resources = Conserve}),
+ throttle = Throttle#throttle{
+ conserve_resources = Conserve}}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -870,10 +878,10 @@ auth_mechanisms_binary(Sock) ->
string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
- State = #v1{auth_mechanism = AuthMechanism,
- auth_state = AuthState,
- connection = Connection =
- #connection{protocol = Protocol},
+ State = #v1{connection = Connection =
+ #connection{protocol = Protocol,
+ auth_mechanism = AuthMechanism,
+ auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
@@ -886,14 +894,16 @@ auth_phase(Response,
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
ok = send_on_channel0(Sock, Secure, Protocol),
- State#v1{auth_state = AuthState1};
+ State#v1{connection = Connection#connection{
+ auth_state = AuthState1}};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}}
+ connection = Connection#connection{user = User,
+ auth_state = none}}
end.
%%--------------------------------------------------------------------------
@@ -901,11 +911,6 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> Name;
-i(host, #v1{host = Host}) -> Host;
-i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
-i(port, #v1{port = Port}) -> Port;
-i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
@@ -922,36 +927,32 @@ i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{last_blocked_by = By}) -> By;
-i(last_blocked_age, #v1{last_blocked_at = never}) ->
+i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked_at = T}) ->
+i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
-i(auth_mechanism, #v1{auth_mechanism = none}) ->
+i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
+
+ic(name, #connection{name = Name}) -> Name;
+ic(host, #connection{host = Host}) -> Host;
+ic(peer_host, #connection{peer_host = PeerHost}) -> PeerHost;
+ic(port, #connection{port = Port}) -> Port;
+ic(peer_port, #connection{peer_port = PeerPort}) -> PeerPort;
+ic(protocol, #connection{protocol = none}) -> none;
+ic(protocol, #connection{protocol = P}) -> P:version();
+ic(user, #connection{user = none}) -> '';
+ic(user, #connection{user = U}) -> U#user.username;
+ic(vhost, #connection{vhost = VHost}) -> VHost;
+ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
+ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(client_properties, #connection{client_properties = CP}) -> CP;
+ic(auth_mechanism, #connection{auth_mechanism = none}) ->
none;
-i(auth_mechanism, #v1{auth_mechanism = Mechanism}) ->
+ic(auth_mechanism, #connection{auth_mechanism = Mechanism}) ->
proplists:get_value(name, Mechanism:description());
-i(protocol, #v1{connection = #connection{protocol = none}}) ->
- none;
-i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
- Protocol:version();
-i(user, #v1{connection = #connection{user = none}}) ->
- '';
-i(user, #v1{connection = #connection{user = #user{
- username = Username}}}) ->
- Username;
-i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
- VHost;
-i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
- Timeout;
-i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
- FrameMax;
-i(client_properties, #v1{connection = #connection{client_properties =
- ClientProperties}}) ->
- ClientProperties;
-i(Item, #v1{}) ->
- throw({bad_argument, Item}).
+ic(Item, #connection{}) -> throw({bad_argument, Item}).
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index a68caadbdc..09ed3d0890 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1297,8 +1297,7 @@ test_statistics() ->
QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
- {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
- QPid = Q#amqqueue.pid,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
@@ -1322,9 +1321,9 @@ test_statistics() ->
length(proplists:get_value(
channel_queue_exchange_stats, E)) > 0
end),
- [{QPid,[{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
+ [{QRes, [{get,1}]}] = proplists:get_value(channel_queue_stats, Event2),
[{X,[{publish,1}]}] = proplists:get_value(channel_exchange_stats, Event2),
- [{{QPid,X},[{publish,1}]}] =
+ [{{QRes,X},[{publish,1}]}] =
proplists:get_value(channel_queue_exchange_stats, Event2),
%% Check the stats remove stuff on queue deletion
@@ -1349,31 +1348,31 @@ test_refresh_events(SecondaryNode) ->
[channel_created, queue_created]),
{_Writer, Ch} = test_spawn(),
- expect_events(Ch, channel_created),
+ expect_events(pid, Ch, channel_created),
rabbit_channel:shutdown(Ch),
{_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(Ch2, channel_created),
+ expect_events(pid, Ch2, channel_created),
rabbit_channel:shutdown(Ch2),
- {new, #amqqueue { pid = QPid } = Q} =
+ {new, #amqqueue{name = QName} = Q} =
rabbit_amqqueue:declare(test_queue(), false, false, [], none),
- expect_events(QPid, queue_created),
+ expect_events(name, QName, queue_created),
rabbit_amqqueue:delete(Q, false, false),
rabbit_tests_event_receiver:stop(),
passed.
-expect_events(Pid, Type) ->
- expect_event(Pid, Type),
+expect_events(Tag, Key, Type) ->
+ expect_event(Tag, Key, Type),
rabbit:force_event_refresh(),
- expect_event(Pid, Type).
+ expect_event(Tag, Key, Type).
-expect_event(Pid, Type) ->
+expect_event(Tag, Key, Type) ->
receive #event{type = Type, props = Props} ->
- case pget(pid, Props) of
- Pid -> ok;
- _ -> expect_event(Pid, Type)
+ case pget(Tag, Props) of
+ Key -> ok;
+ _ -> expect_event(Tag, Key, Type)
end
after ?TIMEOUT -> throw({failed_to_receive_event, Type})
end.
@@ -2228,6 +2227,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
+ variable_queue_publish(IsPersistent, Count, PropFun,
+ fun (_N) -> <<>> end, VQ).
+
+variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2236,16 +2239,18 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
<<>>, #'P_basic'{delivery_mode = case IsPersistent of
true -> 2;
false -> 1
- end}, <<>>),
- PropFun(N, #message_properties{}), self(), VQN)
+ end},
+ PayloadFun(N)),
+ PropFun(N, #message_properties{}), false, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2311,13 +2316,40 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropwhile/1,
+ fun test_dropfetchwhile/1,
fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_requeue/1]],
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_fold/1]],
passed.
+test_variable_queue_fold(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(
+ true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ lists:foldl(
+ fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
+ VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+
+test_variable_queue_fold(Cut, Count, VQ0) ->
+ {Acc, VQ1} = rabbit_variable_queue:fold(
+ fun (M, _, A) ->
+ case msg2int(M) =< Cut of
+ true -> {cont, [M | A]};
+ false -> {stop, A}
+ end
+ end, [], VQ0),
+ true = [N || N <- lists:seq(lists:min([Cut, Count]), 1, -1)] ==
+ [msg2int(M) || M <- Acc],
+ VQ1.
+
+msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
+ binary_to_term(list_to_binary(lists:reverse(P))).
+
test_variable_queue_requeue(VQ0) ->
Interval = 50,
Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
@@ -2337,7 +2369,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2373,41 +2405,86 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6.
-test_dropwhile(VQ0) ->
+test_drop(VQ0) ->
+ %% start by sending a messages
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ %% drop message with AckRequired = true
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
+ true = AckTag =/= undefinded,
+ %% drop again -> empty
+ {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
+ %% requeue
+ {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
+ %% drop message with AckRequired = false
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
+ VQ5.
+
+test_dropfetchwhile(VQ0) ->
Count = 10,
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
false, Count,
- fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
+ fun (N, Props) -> Props#message_properties{expiry = N} end,
+ fun erlang:term_to_binary/1, VQ0),
+
+ %% fetch the first 5 messages
+ {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
+ rabbit_variable_queue:fetchwhile(
+ fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
+ {[Msg | MsgAcc], [AckTag | AckAcc]}
+ end, {[], []}, VQ1),
+ true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
+
+ %% requeue them
+ {_MsgIds, VQ3} = rabbit_variable_queue:requeue(AckTags, VQ2),
%% drop the first 5 messages
- {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
-
- %% fetch five now
- VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {#message_properties{expiry = 6}, VQ4} =
+ rabbit_variable_queue:dropwhile(
+ fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ3),
+
+ %% fetch 5
+ VQ5 = lists:foldl(fun (N, VQN) ->
+ {{Msg, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
+ true = msg2int(Msg) == N,
VQM
- end, VQ2, lists:seq(6, Count)),
+ end, VQ4, lists:seq(6, Count)),
%% should be empty now
- {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3),
+ true = rabbit_variable_queue:is_empty(VQ5),
- VQ4.
+ VQ5.
test_dropwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, VQ2} = rabbit_variable_queue:dropwhile(
+ fun (_) -> false end, VQ1),
+ VQ2
+ end, VQ0).
+
+test_fetchwhile_varying_ram_duration(VQ0) ->
+ test_dropfetchwhile_varying_ram_duration(
+ fun (VQ1) ->
+ {_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
+ fun (_) -> false end,
+ fun (_, _, A) -> A end,
+ ok, VQ1),
+ VQ2
+ end, VQ0).
+
+test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ VQ3 = Fun(VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {_, undefined, VQ6} =
- rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6 = Fun(VQ5),
VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
@@ -2442,7 +2519,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2507,8 +2585,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2530,7 +2608,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- VQ3 = rabbit_variable_queue:fold(fun (_M, _A) -> ok end, VQ2, AckTags),
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
+ ok, VQ2, AckTags),
VQ3.
test_queue_recover() ->
@@ -2554,10 +2633,11 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
- rabbit_amqqueue:internal_delete(QName, QPid1)
+ rabbit_amqqueue:internal_delete(QName)
end),
passed.
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 3a5b96de4f..601656da1c 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/2, tap_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -31,9 +31,9 @@
-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
--spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
--spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
@@ -43,26 +43,26 @@
%%----------------------------------------------------------------------------
init(VHost) ->
- case tracing(VHost) of
+ case enabled(VHost) of
false -> none;
true -> {ok, X} = rabbit_exchange:lookup(
rabbit_misc:r(VHost, exchange, ?XNAME)),
X
end.
-tracing(VHost) ->
+enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceX) ->
- maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
+tap_in(_Msg, none) -> ok;
+tap_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, TraceX) ->
+ trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceX) ->
+tap_out(_Msg, none) -> ok;
+tap_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(TraceX, Msg, <<"deliver">>, QName,
- [{<<"redelivered">>, signedint, RedeliveredNum}]).
+ trace(TraceX, Msg, <<"deliver">>, QName,
+ [{<<"redelivered">>, signedint, RedeliveredNum}]).
%%----------------------------------------------------------------------------
@@ -83,14 +83,11 @@ update_config(Fun) ->
%%----------------------------------------------------------------------------
-maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
+trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
+ _RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
- _RKPrefix, _RKSuffix, _Extra) ->
- ok;
-maybe_trace(X, Msg = #basic_message{content = #content{
- payload_fragments_rev = PFR}},
- RKPrefix, RKSuffix, Extra) ->
+trace(X, Msg = #basic_message{content = #content{payload_fragments_rev = PFR}},
+ RKPrefix, RKSuffix, Extra) ->
{ok, _, _} = rabbit_basic:publish(
X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
#'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6dc65bab31..90ee3439da 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,11 +17,12 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
- dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- depth/1, set_ram_duration_target/2, ram_duration/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4,
+ fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
+ is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
- is_duplicate/2, multiple_routing_keys/0, fold/3]).
+ is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -254,9 +255,8 @@
q3,
q4,
next_seq_id,
- pending_ack,
- pending_ack_index,
- ram_ack_index,
+ ram_pending_ack,
+ disk_pending_ack,
index_state,
msg_store_clients,
durable,
@@ -348,8 +348,8 @@
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
- pending_ack :: gb_tree(),
- ram_ack_index :: gb_tree(),
+ ram_pending_ack :: gb_tree(),
+ disk_pending_ack :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -521,16 +521,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -557,8 +557,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -579,27 +578,28 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+dropwhile(Pred, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, a(State1)};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> {_, State2} = remove(false, MsgStatus, State1),
+ dropwhile(Pred, State2);
+ false -> {MsgProps, a(in_r(MsgStatus, State1))}
+ end
+ end.
-dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
- (Next, S) -> {Next, undefined, S}
- end,
+fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
{empty, State1} ->
- End(undefined, a(State1));
+ {undefined, Acc, a(State1)};
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), AckRequired} of
- {true, true} ->
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
- dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
- {true, false} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, AckRequired, State2, undefined);
- {false, _} ->
- End(MsgProps, a(in_r(MsgStatus, State1)))
+ case Pred(MsgProps) of
+ true -> {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(true, MsgStatus, State2),
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
+ false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -610,9 +610,18 @@ fetch(AckRequired, State) ->
{{value, MsgStatus}, State1} ->
%% it is possible that the message wasn't read from disk
%% at this point, so read it in.
- {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2),
- {Res, a(State3)}
+ {Msg, State2} = read_msg(MsgStatus, false, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -638,16 +647,6 @@ ack(AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-fold(undefined, State, _AckTags) ->
- State;
-fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) ->
- a(lists:foldl(fun(SeqId, State1) ->
- {MsgStatus, State2} =
- read_msg(gb_trees:get(SeqId, PA), false, State1),
- MsgFun(MsgStatus#msg_status.msg, SeqId),
- State2
- end, State, AckTags)).
-
requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3,
q4 = Q4,
@@ -669,12 +668,41 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd },
+ q3 = Q3,
+ q4 = Q4 } = State) ->
+ QFun = fun(MsgStatus, {Acc0, State0}) ->
+ {Msg, State1} = read_msg(MsgStatus, false, State0),
+ {StopGo, AccNext} =
+ Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
+ {StopGo, {AccNext, State1}}
+ end,
+ {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
+ {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
+ {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
+ DeltaSeqId, DeltaSeqIdEnd, State2),
+ {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
+ {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
+ {Acc5, State5}.
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { pending_ack = Ack }) ->
- len(State) + gb_trees:size(Ack).
+depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -711,7 +739,7 @@ ram_duration(State = #vqstate {
ack_out_counter = AckOutCount,
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_ack_count_prev = RamAckCountPrev }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
@@ -722,7 +750,7 @@ ram_duration(State = #vqstate {
{AvgAckIngressRate, AckIngress1} =
update_rate(Now, AckTimestamp, AckInCount, AckIngress),
- RamAckCount = gb_trees:size(RamAckIndex),
+ RamAckCount = gb_trees:size(RPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
@@ -754,19 +782,24 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State = #vqstate { index_state = IndexState }) ->
+needs_timeout(State = #vqstate { index_state = IndexState,
+ target_ram_count = TargetRamCount }) ->
case must_sync_index(State) of
true -> timed;
false ->
case rabbit_queue_index:needs_sync(IndexState) of
true -> idle;
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
+ false -> case TargetRamCount of
+ infinity -> false;
+ _ -> case
+ reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end
end
end
end.
@@ -782,8 +815,8 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
status(#vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
next_seq_id = NextSeqId,
@@ -798,10 +831,10 @@ status(#vqstate {
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{len , Len},
- {pending_acks , gb_trees:size(PA)},
+ {pending_acks , gb_trees:size(RPA) + gb_trees:size(DPA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
+ {ram_ack_count , gb_trees:size(RPA)},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -864,11 +897,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message { id = MsgId }, MsgProps) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
@@ -934,7 +966,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
@@ -943,7 +975,8 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> case gb_trees:is_defined(SeqId, PA) of
+ false -> case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA)) of
false ->
{?QUEUE:in_r(
m(#msg_status {
@@ -1005,8 +1038,8 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = ?QUEUE:new(),
q4 = ?QUEUE:new(),
next_seq_id = NextSeqId,
- pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty(),
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1044,9 +1077,10 @@ in_r(MsgStatus = #msg_status { msg = undefined },
State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, true, State),
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
+ msg = Msg }, Q4a) }
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1062,35 +1096,33 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
end.
-read_msg(MsgStatus, State) -> read_msg(MsgStatus, true, State).
-
-read_msg(MsgStatus = #msg_status { msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent },
+read_msg(#msg_status { msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent },
CountDiskToRam, State = #vqstate { ram_msg_count = RamMsgCount,
msg_store_clients = MSCState}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {MsgStatus #msg_status { msg = Msg },
- State #vqstate { ram_msg_count = RamMsgCount + one_if(CountDiskToRam),
- msg_store_clients = MSCState1 }};
-read_msg(MsgStatus, _CountDiskToRam, State) ->
- {MsgStatus, State}.
-
-internal_fetch(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount }) ->
+ RamMsgCount1 = RamMsgCount + one_if(CountDiskToRam),
+ {Msg, State #vqstate { ram_msg_count = RamMsgCount1,
+ msg_store_clients = MSCState1 }};
+read_msg(#msg_status { msg = Msg }, _CountDiskToRam, State) ->
+ {Msg, State}.
+
+remove(AckRequired, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState,
+ len = Len,
+ persistent_count = PCount }) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1101,12 +1133,11 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
- end,
+ IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
+ end,
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
@@ -1117,16 +1148,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
false -> {undefined, State}
end,
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
+ PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
- State1 #vqstate { ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len1,
- persistent_count = PCount1 }}.
+ {AckTag, State1 #vqstate { ram_msg_count = RamMsgCount1,
+ out_counter = OutCount + 1,
+ index_state = IndexState2,
+ len = Len - 1,
+ persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1223,37 +1252,48 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- msg_on_disk = MsgOnDisk } = MsgStatus,
- State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI,
- ack_in_counter = AckInCount}) ->
- {AckEntry, RAI1} =
- case MsgOnDisk of
- true -> {m(trim_msg_status(MsgStatus)), RAI};
- false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
+record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ ack_in_counter = AckInCount}) ->
+ {RPA1, DPA1} =
+ case Msg of
+ undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
+ _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
end,
- State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
- ram_ack_index = RAI1,
- ack_in_counter = AckInCount + 1}.
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ ack_in_counter = AckInCount + 1}.
+
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, DPA)
+ end.
-remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- {gb_trees:get(SeqId, PA),
- State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
- ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+remove_pending_ack(SeqId, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> DPA1 = gb_trees:delete(SeqId, DPA),
+ {gb_trees:get(SeqId, DPA),
+ State #vqstate { disk_pending_ack = DPA1 }}
+ end.
purge_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
- accumulate_ack(MsgStatus, Acc)
- end, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = gb_trees:empty(),
- ram_ack_index = gb_trees:empty() },
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty() },
+
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
error -> State1;
@@ -1346,11 +1386,12 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
+%% Internal plumbing for requeue and fold
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- read_msg(MsgStatus, State);
+ {Msg, State1} = read_msg(MsgStatus, true, State),
+ {MsgStatus#msg_status { msg = Msg }, State1};
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
@@ -1415,6 +1456,41 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
+qfoldl( Fun, {cont, Acc} = A, Q) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> A;
+ {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
+ end.
+
+lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
+lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
+lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
+
+delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
+ {stop, {Acc, State}};
+delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
+ {cont, {Acc, State}};
+delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState } = State) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {StopCont, {Acc1, MSCState1}} =
+ lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
+ {Acc0, MSCState0}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState0, IsPersistent, MsgId),
+ {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
+ {StopCont, {AccNext, MSCState1}}
+ end, {cont, {Acc, MSCState}}, List),
+ delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }).
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1438,12 +1514,9 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_count = infinity}) ->
- {false, State};
reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
+ ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
rates = #rates { avg_ingress = AvgIngress,
@@ -1453,8 +1526,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
}) ->
{Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamCount) of
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
@@ -1479,23 +1551,23 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
limit_ram_acks(0, State) ->
{0, State};
-limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }) ->
- case gb_trees:is_empty(RAI) of
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
true ->
{Quota, State};
false ->
- {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
- gb_trees:get(SeqId, PA),
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
+ DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
limit_ram_acks(Quota - 1,
- State1 #vqstate { pending_ack = PA1,
- ram_ack_index = RAI1 })
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 })
end.
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
@@ -1561,7 +1633,8 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
- pending_ack = PA,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1569,10 +1642,10 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
- {Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case ?QUEUE:len(Q3a) of
0 ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 297fa56fe3..0bb18f4c7e 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -123,7 +123,7 @@ with(VHostPath, Thunk) ->
infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
i(name, VHost) -> VHost;
-i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(tracing, VHost) -> rabbit_trace:enabled(VHost);
i(Item, _) -> throw({bad_argument, Item}).
info(VHost) -> infos(?INFO_KEYS, VHost).