summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-17 16:54:23 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-17 16:54:23 +0100
commit66217363cd194531b3fa345fcc037c62978e95ac (patch)
treef9c7fff94e8a8570fc006c876624c99f1c8d376c /src
parent7e0f44f2a8d4c602ad25d574ebb6fac2e9e3b1b5 (diff)
parentbbd45f3d83621c78df930ab03864de04f65daacf (diff)
downloadrabbitmq-server-git-66217363cd194531b3fa345fcc037c62978e95ac.tar.gz
Merge bug21413
Diffstat (limited to 'src')
-rw-r--r--src/gatherer.erl51
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl133
-rw-r--r--src/rabbit_amqqueue_process.erl186
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_control_main.erl10
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl58
-rw-r--r--src/rabbit_mirror_queue_master.erl114
-rw-r--r--src/rabbit_mirror_queue_misc.erl241
-rw-r--r--src/rabbit_mirror_queue_slave.erl330
-rw-r--r--src/rabbit_mnesia.erl316
-rw-r--r--src/rabbit_policy.erl17
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_table.erl311
-rw-r--r--src/rabbit_tests.erl51
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade.erl5
-rw-r--r--src/rabbit_upgrade_functions.erl33
-rw-r--r--src/rabbit_variable_queue.erl2
-rw-r--r--src/rabbit_vm.erl129
21 files changed, 1173 insertions, 837 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 98b360389a..29d2d71366 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -32,6 +32,7 @@
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(sync_in/2 :: (pid(), any()) -> 'ok').
-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -62,6 +63,9 @@ finish(Pid) ->
in(Pid, Value) ->
gen_server2:cast(Pid, {in, Value}).
+sync_in(Pid, Value) ->
+ gen_server2:call(Pid, {in, Value}, infinity).
+
out(Pid) ->
gen_server2:call(Pid, out, infinity).
@@ -78,19 +82,22 @@ handle_call(stop, _From, State) ->
handle_call(fork, _From, State = #gstate { forks = Forks }) ->
{reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
+handle_call({in, Value}, From, State) ->
+ {noreply, in(Value, From, State), hibernate};
+
handle_call(out, From, State = #gstate { forks = Forks,
values = Values,
blocked = Blocked }) ->
case queue:out(Values) of
+ {empty, _} when Forks == 0 ->
+ {reply, empty, State, hibernate};
{empty, _} ->
- case Forks of
- 0 -> {reply, empty, State, hibernate};
- _ -> {noreply,
- State #gstate { blocked = queue:in(From, Blocked) },
- hibernate}
- end;
- {{value, _Value} = V, NewValues} ->
- {reply, V, State #gstate { values = NewValues }, hibernate}
+ {noreply, State #gstate { blocked = queue:in(From, Blocked) },
+ hibernate};
+ {{value, {PendingIn, Value}}, NewValues} ->
+ reply(PendingIn, ok),
+ {reply, {value, Value}, State #gstate { values = NewValues },
+ hibernate}
end;
handle_call(Msg, _From, State) ->
@@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
{noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
hibernate};
-handle_cast({in, Value}, State = #gstate { values = Values,
- blocked = Blocked }) ->
- {noreply, case queue:out(Blocked) of
- {empty, _} ->
- State #gstate { values = queue:in(Value, Values) };
- {{value, From}, NewBlocked} ->
- gen_server2:reply(From, {value, Value}),
- State #gstate { blocked = NewBlocked }
- end, hibernate};
+handle_cast({in, Value}, State) ->
+ {noreply, in(Value, undefined, State), hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+%%----------------------------------------------------------------------------
+
+in(Value, From, State = #gstate { values = Values, blocked = Blocked }) ->
+ case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in({From, Value}, Values) };
+ {{value, PendingOut}, NewBlocked} ->
+ reply(From, ok),
+ gen_server2:reply(PendingOut, {value, Value}),
+ State #gstate { blocked = NewBlocked }
+ end.
+
+reply(undefined, _Reply) -> ok;
+reply(From, Reply) -> gen_server2:reply(From, Reply).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3fe27cd901..93808f8413 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -364,7 +364,7 @@ status() ->
{running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}],
+ {memory, rabbit_vm:memory()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
@@ -427,7 +427,7 @@ stop(_State) ->
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
true -> rabbit_amqqueue:on_node_down(node());
- false -> rabbit_mnesia:empty_ram_only_tables()
+ false -> rabbit_table:clear_ram_only_tables()
end,
ok.
@@ -558,7 +558,7 @@ recover() ->
rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()).
maybe_insert_default_data() ->
- case rabbit_mnesia:is_db_empty() of
+ case rabbit_table:is_empty() of
true -> insert_default_data();
false -> ok
end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4a20a1bcaf..6ad85b24f5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-
+-export([start_mirroring/1, stop_mirroring/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -165,6 +165,8 @@
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(start_mirroring/1 :: (pid()) -> 'ok').
+-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-endif.
@@ -210,19 +212,20 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- {Node, MNodes} = determine_queue_nodes(Args),
- Q = start_queue_process(Node, #amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- mirror_nodes = MNodes}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
+ Q0 = rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ gm_pids = []}),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
+ Q1 = start_queue_process(Node, Q0),
+ case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
- Q1 -> Q1
+ Q2 -> Q2
end.
internal_declare(Q, true) ->
@@ -271,24 +274,8 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(_Q1, _Q2) ->
- ok.
-
-determine_queue_nodes(Args) ->
- Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
- PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
- case {Policy, PolicyParams} of
- {{_Type, <<"nodes">>}, {array, Nodes}} ->
- case [list_to_atom(binary_to_list(Node)) ||
- {longstr, Node} <- Nodes] of
- [Node] -> {Node, undefined};
- [First | Rest] -> {First, [First | Rest]}
- end;
- {{_Type, <<"all">>}, _} ->
- {node(), all};
- _ ->
- {node(), undefined}
- end.
+policy_changed(Q1, Q2) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -311,8 +298,6 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q = #amqqueue{slave_pids = []}} ->
- rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -364,13 +349,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
rabbit_misc:assert_args_equivalence(
- Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
+ Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
{<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
- {<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -421,29 +404,6 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_ha_policy_arg({longstr, <<"all">>}, _Args) ->
- ok;
-check_ha_policy_arg({longstr, <<"nodes">>}, Args) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
- undefined ->
- {error, {require, 'x-ha-policy-params'}};
- {array, []} ->
- {error, {require_non_empty_list_of_nodes_for_ha}};
- {array, Ary} ->
- case lists:all(fun ({longstr, _Node}) -> true;
- (_ ) -> false
- end, Ary) of
- true -> ok;
- false -> {error, {require_node_list_as_longstrs_for_ha, Ary}}
- end;
- {Type, _} ->
- {error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
- end;
-check_ha_policy_arg({longstr, Policy}, _Args) ->
- {error, {invalid_ha_policy, Policy}};
-check_ha_policy_arg({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
-
list() ->
mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
@@ -613,6 +573,9 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -647,8 +610,7 @@ pseudo_queue(QueueName, Pid) ->
auto_delete = false,
arguments = [],
pid = Pid,
- slave_pids = [],
- mirror_nodes = undefined}.
+ slave_pids = []}.
deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
@@ -661,29 +623,50 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% returned. It is therefore safe to use a fire-and-forget cast
%% here and return the QPids - the semantics is preserved. This
%% scales much better than the case below.
- QPids = qpids(Qs),
+ {MPids, SPids} = qpids(Qs),
+ QPids = MPids ++ SPids,
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
noflow -> ok
end,
- delegate:invoke_no_result(
- QPids, fun (QPid) ->
- gen_server2:cast(QPid, {deliver, Delivery, Flow})
- end),
+
+ %% We let slaves know that they were being addressed as slaves at
+ %% the time - if they receive such a message from the channel
+ %% after they have become master they should mark the message as
+ %% 'delivered' since they do not know what the master may have
+ %% done with it.
+ MMsg = {deliver, Delivery, false, Flow},
+ SMsg = {deliver, Delivery, true, Flow},
+ delegate:invoke_no_result(MPids,
+ fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
+ delegate:invoke_no_result(SPids,
+ fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
- case delegate:invoke(
- qpids(Qs), fun (QPid) ->
- ok = gen_server2:call(QPid, {deliver, Delivery},
- infinity)
- end) of
- {[], _} -> {unroutable, []};
- {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
+ {MPids, SPids} = qpids(Qs),
+ %% see comment above
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
+ {MRouted, _} = delegate:invoke(
+ MPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, MMsg, infinity)
+ end),
+ {SRouted, _} = delegate:invoke(
+ SPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, SMsg, infinity)
+ end),
+ case MRouted ++ SRouted of
+ [] -> {unroutable, []};
+ R -> {routed, [QPid || {QPid, ok} <- R]}
end.
-qpids(Qs) -> lists:append([[QPid | SPids] ||
- #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+qpids(Qs) ->
+ {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
+ {MPidAcc, SPidAcc}) ->
+ {[QPid | MPidAcc], [SPids | SPidAcc]}
+ end, {[], []}, Qs),
+ {MPids, lists:append(SPids)}.
safe_delegate_call_ok(F, Pids) ->
{_, Bads} = delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 10ac5bea60..7ce6958223 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/8]).
+-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -76,8 +76,8 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/8 ::
- (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
+-spec(init_with_backing_queue_state/7 ::
+ (rabbit_types:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -86,12 +86,14 @@
-define(STATISTICS_KEYS,
[pid,
+ policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages,
consumers,
+ active_consumers,
memory,
slave_pids,
synchronised_slave_pids,
@@ -144,7 +146,7 @@ init(Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, Senders, MTC) ->
+ RateTRef, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -166,9 +168,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
delayed_stop = undefined,
queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
- State1 = requeue_and_run(AckTags, process_args(
- rabbit_event:init_stats_timer(
- State, #q.stats_timer))),
+ State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)),
lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State1, Deliveries).
@@ -179,7 +179,6 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(Reason, State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
- %% FIXME: How do we cancel active subscriptions?
terminate_shutdown(
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
@@ -230,8 +229,7 @@ matches(false, Q1, Q2) ->
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
- Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -296,11 +294,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_module(#amqqueue{arguments = Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> {ok, BQM} = application:get_env(backing_queue_module),
- BQM;
- _Policy -> rabbit_mirror_queue_master
+backing_queue_module(Q) ->
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ true -> rabbit_mirror_queue_master
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -499,32 +497,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
- never;
-should_confirm_message(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+ {never, State};
+send_or_record_confirm(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- #q{q = #amqqueue{durable = true}}) ->
- {eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo},
- _State) ->
- {immediately, SenderPid, MsgSeqNo}.
-
-needs_confirming({eventually, _, _, _}) -> true;
-needs_confirming(_) -> false.
-
-maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel =
- gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
-maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ {eventually, State#q{msg_id_to_channel = MTC1}};
+send_or_record_confirm(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- State;
-maybe_record_confirm_message(_Confirm, State) ->
- State.
+ {immediately, State}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -546,46 +533,39 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
{{Message, Props#message_properties.delivered, AckTag},
true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
- {Duplicate, BQS1} ->
- %% if the message has previously been seen by the BQ then
- %% it must have been seen under the same circumstances as
- %% now: i.e. if it is now a deliver_immediately then it
- %% must have been before.
- {case Duplicate of
- published -> true;
- discarded -> false
- end,
- State#q{backing_queue_state = BQS1}}
+ {published, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}};
+ {discarded, BQS1} ->
+ {false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, Delivered,
- State) ->
- Confirm = should_confirm_message(Delivery, State),
+deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+ Delivered, State) ->
+ {Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State) of
- {true, State1} ->
- maybe_record_confirm_message(Confirm, State1);
- %% the next one is an optimisations
- %% TODO: optimise the Confirm =/= never case too
- {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
- discard_delivery(Delivery, State1);
- {false, State1} ->
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
+ case attempt_delivery(Delivery, Props, State1) of
+ {true, State2} ->
+ State2;
+ %% The next one is an optimisation
+ {false, State2 = #q{ttl = 0, dlx = undefined}} ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([Message#basic_message.id], State2),
+ BQS1 = BQ:discard(Message, SenderPid, BQS),
+ State3#q{backing_queue_state = BQS1};
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) ->
- {_MsgIds, BQS1} = M:requeue(AckTags, BQS),
- BQS1
- end, State).
+requeue_and_run(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ run_message_queue(State#q{backing_queue_state = BQS1}).
-fetch(AckRequired, State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+fetch(AckRequired, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
@@ -679,12 +659,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-backing_queue_timeout(State = #q{backing_queue = BQ}) ->
- run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State).
-
-run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
+backing_queue_timeout(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State#q{backing_queue_state = BQ:timeout(BQS)}.
subtract_acks(ChPid, AckTags, State, Fun) ->
case lookup_ch(ChPid) of
@@ -696,15 +673,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-discard_delivery(#delivery{sender = SenderPid,
- message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-
message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm),
+ needs_confirming = Confirm == eventually,
delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
@@ -897,6 +868,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
+i(policy, #q{q = #amqqueue{name = Name}}) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ case rabbit_policy:name(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
@@ -914,18 +891,24 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(active_consumers, _) ->
+ active_consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> '';
- {ok, #amqqueue{slave_pids = SPids}} -> SPids
+ {ok, Q = #amqqueue{slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> SPids
end;
i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> '';
- {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> SSPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -1030,10 +1013,10 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery}, From, State) ->
+handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, false, State));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1149,6 +1132,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end));
+handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -1173,10 +1173,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
noreply(State);
-handle_cast({run_backing_queue, Mod, Fun}, State) ->
- noreply(run_backing_queue(Mod, Fun, State));
+handle_cast({run_backing_queue, Mod, Fun},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ noreply(run_message_queue(
+ State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
@@ -1185,7 +1187,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, false, State1));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d69a6c3b98..c6d1778532 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -24,6 +24,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
@@ -117,7 +118,7 @@
%% first time the message id appears in the result of
%% drain_confirmed. All subsequent appearances of that message id will
%% be ignored.
--callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+-callback drain_confirmed(state()) -> {msg_ids(), state()}.
%% Drop messages from the head of the queue while the supplied predicate returns
%% true. Also accepts a boolean parameter that determines whether the messages
@@ -136,7 +137,7 @@
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
--callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback ack([ack()], state()) -> {msg_ids(), state()}.
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
@@ -144,7 +145,7 @@
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
--callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback requeue([ack()], state()) -> {msg_ids(), state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index e75e1f6f7c..a3cbf6e53c 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -170,8 +170,8 @@ start() ->
{error, Reason} ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
- {parse_error, {_Line, Mod, Err}} ->
- print_error("~s", [lists:flatten(Mod:format_error(Err))]),
+ {error_string, Reason} ->
+ print_error("~s", [Reason]),
rabbit_misc:quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
@@ -477,12 +477,14 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
Node, erl_eval, exprs, [Parsed, []]),
io:format("~p~n", [Value]),
ok;
- {error, E} -> {parse_error, E}
+ {error, E} -> {error_string, format_parse_error(E)}
end;
{error, E, _} ->
- {parse_error, E}
+ {error_string, format_parse_error(E)}
end.
+format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
+
%%----------------------------------------------------------------------------
wait_for_application(Node, PidFile, Application, Inform) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4cc96ef552..a205b23d0b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
-i(policy, X) -> rabbit_policy:name(X);
+i(policy, X) -> case rabbit_policy:name(X) of
+ none -> '';
+ Policy -> Policy
+ end;
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 4455b4419f..16690693f1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -33,14 +33,14 @@
gm,
monitors,
death_fun,
- length_fun
+ depth_fun
}).
-ifdef(use_specs).
-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
rabbit_mirror_queue_master:death_fun(),
- rabbit_mirror_queue_master:length_fun()) ->
+ rabbit_mirror_queue_master:depth_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -101,19 +101,25 @@
%% channel during a publish, only some of the mirrors may receive that
%% publish. As a result of this problem, the messages broadcast over
%% the gm contain published content, and thus slaves can operate
-%% successfully on messages that they only receive via the gm. The key
-%% purpose of also sending messages directly from the channels to the
-%% slaves is that without this, in the event of the death of the
-%% master, messages could be lost until a suitable slave is promoted.
+%% successfully on messages that they only receive via the gm.
%%
-%% However, that is not the only reason. For example, if confirms are
-%% in use, then there is no guarantee that every slave will see the
-%% delivery with the same msg_seq_no. As a result, the slaves have to
-%% wait until they've seen both the publish via gm, and the publish
-%% via the channel before they have enough information to be able to
-%% perform the publish to their own bq, and subsequently issue the
-%% confirm, if necessary. Either form of publish can arrive first, and
-%% a slave can be upgraded to the master at any point during this
+%% The key purpose of also sending messages directly from the channels
+%% to the slaves is that without this, in the event of the death of
+%% the master, messages could be lost until a suitable slave is
+%% promoted. However, that is not the only reason. A slave cannot send
+%% confirms for a message until it has seen it from the
+%% channel. Otherwise, it might send a confirm to a channel for a
+%% message that it might *never* receive from that channel. This can
+%% happen because new slaves join the gm ring (and thus receive
+%% messages from the master) before inserting themselves in the
+%% queue's mnesia record (which is what channels look at for routing).
+%% As it turns out, channels will simply ignore such bogus confirms,
+%% but relying on that would introduce a dangerously tight coupling.
+%%
+%% Hence the slaves have to wait until they've seen both the publish
+%% via gm, and the publish via the channel before they issue the
+%% confirm. Either form of publish can arrive first, and a slave can
+%% be upgraded to the master at any point during this
%% process. Confirms continue to be issued correctly, however.
%%
%% Because the slave is a full process, it impersonates parts of the
@@ -154,8 +160,8 @@
%% be able to work out when their head does not differ from the master
%% (and is much simpler and cheaper than getting the master to hang on
%% to the guid of the msg at the head of its queue). When a slave is
-%% promoted to a master, it unilaterally broadcasts its length, in
-%% order to solve the problem of length requests from new slaves being
+%% promoted to a master, it unilaterally broadcasts its depth, in
+%% order to solve the problem of depth requests from new slaves being
%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
@@ -297,15 +303,15 @@
%% if they have no mirrored content at all. This is not surprising: to
%% achieve anything more sophisticated would require the master and
%% recovering slave to be able to check to see whether they agree on
-%% the last seen state of the queue: checking length alone is not
+%% the last seen state of the queue: checking depth alone is not
%% sufficient in this case.
%%
%% For more documentation see the comments in bug 23554.
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun, LengthFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
+start_link(Queue, GM, DeathFun, DepthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -317,7 +323,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -333,7 +339,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
gm = GM1,
monitors = pmon:new(),
death_fun = DeathFun,
- length_fun = LengthFun },
+ depth_fun = DepthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -343,7 +349,7 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -352,8 +358,8 @@ handle_cast({gm_deaths, Deaths},
{stop, normal, State}
end;
-handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
- ok = LengthFun(),
+handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
+ ok = DepthFun(),
noreply(State);
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
@@ -397,9 +403,7 @@ members_changed([_CPid], _Births, []) ->
members_changed([CPid], _Births, Deaths) ->
ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
-handle_msg([_CPid], _From, master_changed) ->
- ok;
-handle_msg([CPid], _From, request_length = Msg) ->
+handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4cfb3dcbfa..377d51868d 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -25,7 +25,9 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
+-export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]).
+
+-export([init_with_existing_bq/3, stop_mirroring/1]).
-behaviour(rabbit_backing_queue).
@@ -44,10 +46,10 @@
-ifdef(use_specs).
--export_type([death_fun/0, length_fun/0]).
+-export_type([death_fun/0, depth_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
--type(length_fun() :: fun (() -> 'ok')).
+-type(depth_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -59,10 +61,14 @@
known_senders :: set()
}).
--spec(promote_backing_queue_state/6 ::
- (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
+-spec(promote_backing_queue_state/7 ::
+ (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) ->
+ master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
--spec(length_fun/0 :: () -> length_fun()).
+-spec(depth_fun/0 :: () -> depth_fun()).
+-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
+ master_state()).
+-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
-endif.
@@ -82,20 +88,27 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
- AsyncCallback) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), length_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 = (case MNodes of
- all -> rabbit_mnesia:cluster_nodes(all);
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
- [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
+init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) ->
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
+ State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+ State.
+
+init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]})
+ end),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -106,8 +119,16 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
ack_msg_id = dict:new(),
known_senders = sets:new() }.
+stop_mirroring(State = #state { coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ unlink(CPid),
+ stop_all_slaves(shutdown, State),
+ {BQ, BQS}.
+
terminate({shutdown, dropped} = Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination - this node has been explicitly
%% dropped. Normally, non-durable queues would be tidied up on
%% startup, but there's a possibility that we will be added back
@@ -123,26 +144,30 @@ terminate(Reason,
%% node. Thus just let some other slave take over.
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
-delete_and_terminate(Reason, State = #state { gm = GM,
- backing_queue = BQ,
+delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
+ stop_all_slaves(Reason, State),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 }.
+
+stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
- monitor_wait(MRefs),
- ok = gm:forget_group(proplists:get_value(group_name, Info)),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
-
-monitor_wait([]) ->
- ok;
-monitor_wait([MRef | MRefs]) ->
- receive({'DOWN', MRef, process, _Pid, _Info}) ->
- ok
- end,
- monitor_wait(MRefs).
+ [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ QName = proplists:get_value(group_name, Info),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { gm_pids = [], slave_pids = [] })
+ end),
+ ok = gm:forget_group(QName).
purge(State = #state { gm = GM,
backing_queue = BQ,
@@ -362,8 +387,10 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
case dict:find(MsgId, SS) of
error ->
ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) };
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQ:discard(Msg, ChPid, BQS),
+ seen_status = dict:erase(MsgId, SS) });
{ok, discarded} ->
State
end.
@@ -372,13 +399,16 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% Other exported functions
%% ---------------------------------------------------------------------------
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
- Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
+promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ Len = BQ:len(BQS1),
+ Depth = BQ:depth(BQS1),
+ true = Len == Depth, %% ASSERTION: everything must have been requeued
+ ok = gm:broadcast(GM, {depth, Depth}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
- backing_queue_state = BQS,
+ backing_queue_state = BQS1,
set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
@@ -397,7 +427,7 @@ sender_death_fun() ->
end)
end.
-length_fun() ->
+depth_fun() ->
Self = self(),
fun () ->
rabbit_amqqueue:run_backing_queue(
@@ -410,10 +440,8 @@ length_fun() ->
end)
end.
-maybe_store_acktag(undefined, _MsgId, AM) ->
- AM;
-maybe_store_acktag(AckTag, MsgId, AM) ->
- dict:store(AckTag, MsgId, AM).
+maybe_store_acktag(undefined, _MsgId, AM) -> AM;
+maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
ensure_monitoring(ChPid, State = #state { coordinator = CPid,
known_senders = KS }) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 89e334ddd2..4c8406d9cf 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,9 +16,12 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, on_node_up/0,
- drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4, store_updated_slaves/1]).
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
+ report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
+ is_mirrored/1, update_mirrors/2]).
+
+%% for testing only
+-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
@@ -26,19 +29,21 @@
-ifdef(use_specs).
--spec(remove_from_queue/2 ::
- (rabbit_amqqueue:name(), [pid()])
+-spec(remove_from_queue/3 ::
+ (rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(drop_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
--spec(add_mirror/3 ::
- (rabbit_types:vhost(), binary(), atom())
- -> rabbit_types:ok_or_error(any())).
+ (rabbit_amqqueue:name(), node()) ->
+ {'ok', atom()} | rabbit_types:error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
+-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
+ {node(), [node()]}).
+-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
+-spec(update_mirrors/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -52,8 +57,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, DeadPids) ->
- DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -61,20 +65,27 @@ remove_from_queue(QueueName, DeadPids) ->
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
- [QPid1 | SPids1] = Alive =
- [Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
+ {Dead, GMPids1} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- Dead],
+ Alive = [QPid | SPids] -- DeadPids,
+ {QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
+ GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
- _ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
store_updated_slaves(
Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
+ slave_pids = SPids1,
+ gm_pids = GMPids1 }),
{ok, QPid1, [QPid | SPids] -- Alive};
_ ->
%% Master has changed, and we're not it,
@@ -87,32 +98,41 @@ remove_from_queue(QueueName, DeadPids) ->
end).
on_node_up() ->
- Qs =
+ QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (#amqqueue { mirror_nodes = undefined }, QsN) ->
- QsN;
- (#amqqueue { name = QName,
- mirror_nodes = all }, QsN) ->
- [QName | QsN];
- (#amqqueue { name = QName,
- mirror_nodes = MNodes }, QsN) ->
- case lists:member(node(), MNodes) of
- true -> [QName | QsN];
- false -> QsN
+ fun (Q = #amqqueue{name = QName,
+ pid = Pid,
+ slave_pids = SPids}, QNames0) ->
+ %% We don't want to pass in the whole
+ %% cluster - we don't want a situation
+ %% where starting one node causes us to
+ %% decide to start a mirror on another
+ PossibleNodes0 = [node(P) || P <- [Pid | SPids]],
+ PossibleNodes =
+ case lists:member(node(), PossibleNodes0) of
+ true -> PossibleNodes0;
+ false -> [node() | PossibleNodes0]
+ end,
+ {_MNode, SNodes} = suggested_queue_nodes(
+ Q, PossibleNodes),
+ case lists:member(node(), SNodes) of
+ true -> [QName | QNames0];
+ false -> QNames0
end
end, [], rabbit_queue)
end),
- [add_mirror(Q, node()) || Q <- Qs],
+ [{ok, _} = add_mirror(QName, node()) || QName <- QNames],
ok.
-drop_mirror(VHostPath, QueueName, MirrorNode) ->
- drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+drop_mirrors(QName, Nodes) ->
+ [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ ok.
-drop_mirror(Queue, MirrorNode) ->
+drop_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
@@ -128,56 +148,61 @@ drop_mirror(Queue, MirrorNode) ->
end
end).
-add_mirror(VHostPath, QueueName, MirrorNode) ->
- add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+add_mirrors(QName, Nodes) ->
+ [{ok, _} = add_mirror(QName, Node) || Node <- Nodes],
+ ok.
-add_mirror(Queue, MirrorNode) ->
+add_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
start_child(Name, MirrorNode, Q);
[SPid] ->
case rabbit_misc:is_process_alive(SPid) of
- true ->
- {error,{queue_already_mirrored_on_node,
- MirrorNode}};
- false ->
- start_child(Name, MirrorNode, Q)
+ true -> {ok, already_mirrored};
+ false -> start_child(Name, MirrorNode, Q)
end
end
end).
start_child(Name, MirrorNode, Q) ->
- case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const({ok, down}),
+ fun () ->
+ rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
+ end) of
{ok, undefined} ->
%% this means the mirror process was
%% already running on the given node.
- ok;
+ {ok, already_mirrored};
+ {ok, down} ->
+ %% Node went down between us deciding to start a mirror
+ %% and actually starting it. Which is fine.
+ {ok, node_down};
{ok, SPid} ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
+ {ok, started};
{error, {{stale_master_pid, StalePid}, _}} ->
rabbit_log:warning("Detected stale HA master while adding "
"mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, StalePid]),
- ok;
+ {ok, stale_master};
{error, {{duplicate_live_master, _}=Err, _}} ->
- throw(Err);
+ Err;
Other ->
Other
end.
-if_mirrored_queue(Queue, Fun) ->
- rabbit_amqqueue:with(
- Queue, fun (#amqqueue { arguments = Args } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> ok;
- _ -> Fun(Q)
- end
- end).
+if_mirrored_queue(QName, Fun) ->
+ rabbit_amqqueue:with(QName, fun (Q) ->
+ case is_mirrored(Q) of
+ false -> ok;
+ true -> Fun(Q)
+ end
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
@@ -201,3 +226,105 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
%% Wake it up so that we emit a stats event
rabbit_amqqueue:wake_up(Q1),
Q1.
+
+%%----------------------------------------------------------------------------
+
+promote_slave([SPid | SPids]) ->
+ %% The slave pids are maintained in descending order of age, so
+ %% the one to promote is the oldest.
+ {SPid, SPids}.
+
+suggested_queue_nodes(Q) ->
+ suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)).
+
+%% This variant exists so we can pull a call to
+%% rabbit_mnesia:cluster_nodes(running) out of a loop or
+%% transaction or both.
+suggested_queue_nodes(Q, PossibleNodes) ->
+ {MNode0, SNodes} = actual_queue_nodes(Q),
+ MNode = case MNode0 of
+ none -> node();
+ _ -> MNode0
+ end,
+ suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
+ {MNode, SNodes}, PossibleNodes).
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ {ok, P} -> P;
+ _ -> none
+ end.
+
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
+ {MNode, Possible -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+ Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ Unavailable = Nodes -- Possible,
+ Available = Nodes -- Unavailable,
+ case Available of
+ [] -> %% We have never heard of anything? Not much we can do but
+ %% keep the master alive.
+ {MNode, []};
+ _ -> case lists:member(MNode, Available) of
+ true -> {MNode, Available -- [MNode]};
+ false -> promote_slave(Available)
+ end
+ end;
+%% When we need to add nodes, we randomise our candidate list as a
+%% crude form of load-balancing. TODO it would also be nice to
+%% randomise the list of ones to remove when we have too many - but
+%% that would fail to take account of synchronisation...
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+ SCount = Count - 1,
+ {MNode, case SCount > length(SNodes) of
+ true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
+ false -> lists:sublist(SNodes, SCount)
+ end};
+suggested_queue_nodes(_, _, {MNode, _}, _) ->
+ {MNode, []}.
+
+shuffle(L) ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
+ L1.
+
+actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+ {case MPid of
+ none -> none;
+ _ -> node(MPid)
+ end, [node(Pid) || Pid <- SPids]}.
+
+is_mirrored(Q) ->
+ case policy(<<"ha-mode">>, Q) of
+ <<"all">> -> true;
+ <<"nodes">> -> true;
+ <<"exactly">> -> true;
+ _ -> false
+ end.
+
+
+%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to
+%% master and start any needed slaves. However, if node(QPid) is not
+%% in the nodes for the policy, it won't switch it. So this is for the
+%% case where we kill the existing queue and restart elsewhere. TODO:
+%% is this TRTTD? All alternatives seem ugly.
+update_mirrors(OldQ = #amqqueue{pid = QPid},
+ NewQ = #amqqueue{pid = QPid}) ->
+ case {is_mirrored(OldQ), is_mirrored(NewQ)} of
+ {false, false} -> ok;
+ {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid),
+ update_mirrors0(OldQ, NewQ); %% [1]
+ {true, true} -> update_mirrors0(OldQ, NewQ)
+ end.
+
+update_mirrors0(OldQ = #amqqueue{name = QName},
+ NewQ = #amqqueue{name = QName}) ->
+ All = fun ({A,B}) -> [A|B] end,
+ OldNodes = All(actual_queue_nodes(OldQ)),
+ NewNodes = All(suggested_queue_nodes(NewQ)),
+ add_mirrors(QName, NewNodes -- OldNodes),
+ drop_mirrors(QName, OldNodes -- NewNodes),
+ ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 625bcdffba..3d8bd8b408 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,6 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -72,7 +71,6 @@
sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
- ack_num,
msg_id_status,
known_senders,
@@ -88,7 +86,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(#amqqueue { name = QueueName } = Q) ->
+init(Q = #amqqueue { name = QName }) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -101,23 +99,23 @@ init(#amqqueue { name = QueueName } = Q) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()]),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QueueName) end) of
- {new, MPid} ->
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, GM, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -125,7 +123,6 @@ init(#amqqueue { name = QueueName } = Q) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- ack_num = 0,
msg_id_status = dict:new(),
known_senders = pmon:new(),
@@ -134,7 +131,7 @@ init(#amqqueue { name = QueueName } = Q) ->
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
+ ok = gm:broadcast(GM, request_depth),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -143,16 +140,15 @@ init(#amqqueue { name = QueueName } = Q) ->
duplicate_live_master ->
{stop, {duplicate_live_master, Node}};
existing ->
+ gm:leave(GM),
ignore
end.
-init_it(Self, Node, QueueName) ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+init_it(Self, GM, Node, QName) ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
+ mnesia:read({rabbit_queue, QName}),
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
+ [] -> add_slave(Q, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -160,48 +156,50 @@ init_it(Self, Node, QueueName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
{new, QPid}
end
end.
-handle_call({deliver, Delivery}, From, State) ->
+%% Add to the end, so they are in descending order of age, see
+%% rabbit_mirror_queue_misc:promote_slave/1
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}).
+
+handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- gm = GM,
- master_pid = MPid }) ->
- %% The GM has told us about deaths, which means we're not going to
- %% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
+ Self = self(),
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
- if node(Pid) =:= node(MPid) ->
+ case Pid of
+ MPid ->
%% master hasn't changed
- reply(ok, State);
- node(Pid) =:= node() ->
+ gen_server2:reply(From, ok),
+ noreply(State);
+ Self ->
%% we've become master
- promote_me(From, State);
- true ->
- %% master has changed to not us.
+ QueueState = promote_me(From, State),
+ {become, rabbit_amqqueue_process, QueueState, hibernate};
+ _ ->
+ %% master has changed to not us
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- %% GM is lazy. So we know of the death of the
- %% slave since it is a neighbour of ours, but
- %% until a message is sent, not all members will
- %% know. That might include the new master. So
- %% broadcast a no-op message to wake everyone up.
- ok = gm:broadcast(GM, master_changed),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -214,7 +212,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
@@ -250,8 +249,8 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
- ok = gm:broadcast(GM, {process_death, MPid}),
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
+ ok = gm:broadcast(GM, process_death),
noreply(State);
handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
@@ -287,7 +286,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
+ Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -338,16 +337,21 @@ joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
members_changed([_SPid], _Births, []) -> ok;
members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths).
-handle_msg([_SPid], _From, master_changed) ->
- ok;
-handle_msg([_SPid], _From, request_length) ->
+handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
-handle_msg([SPid], _From, {process_death, Pid}) ->
- inform_deaths(SPid, [Pid]);
+handle_msg([_SPid], _From, process_death) ->
+ %% Since GM is by nature lazy we need to make sure there is some
+ %% traffic when a master dies, to make sure we get informed of the
+ %% death. That's all process_death does, create some traffic. We
+ %% must not take any notice of the master death here since it
+ %% comes without ordering guarantees - there could still be
+ %% messages from the master we have yet to receive. When we get
+ %% members_changed, then there will be no more messages.
+ ok;
handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) ->
ok = gen_server2:cast(CPid, {gm, Msg}),
{stop, {shutdown, ring_shutdown}};
@@ -368,7 +372,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
@@ -387,14 +391,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -406,16 +416,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -444,12 +454,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
- rabbit_mirror_queue_master:length_fun()),
+ rabbit_mirror_queue_master:depth_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
- %% TODO this has been in here since the beginning, but it's not
- %% obvious if it is needed. Investigate...
- ok = gm:confirmed_broadcast(GM, master_changed),
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
@@ -485,18 +492,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -513,34 +520,24 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% those messages are then requeued. However, as discussed above,
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- %%
- %% Everything that's in MA gets requeued. Consequently the new
- %% master should start with a fresh AM as there are no messages
- %% pending acks.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MPids),
-
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
- NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
- AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
+ CPid, BQ, BQS, GM, AckTags, SS, MPids),
+
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, KS, MTC),
- {become, rabbit_amqqueue_process, QueueState, hibernate}.
+ rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS,
+ MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -634,9 +631,8 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -646,34 +642,11 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
- %% BQ has confirmed it but we didn't know what the
- %% msg_seq_no was at the time. We do now!
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ {ok, Status} ->
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- {ok, {published, ChPid}} ->
- %% It was published to the BQ and we didn't know the
- %% msg_seq_no so couldn't confirm it at the time.
- {MS1, SQ1} =
- case needs_confirming(Delivery, State1) of
- never -> {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)};
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- {dict:store(MsgId, MMS, MS), SQ};
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)}
- end,
State1 #state { msg_id_status = MS1,
- sender_queues = SQ1 };
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
end.
@@ -691,42 +664,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. As a result, we
- %% may know that it needs confirming without knowing its
- %% msg_seq_no, which means that we can see the confirmation come
- %% back from the backing queue without knowing the msg_seq_no,
- %% which means that we're going to have to hang on to the fact
- %% that we've seen the msg_id confirmed until we can associate it
- %% with a msg_seq_no.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
+ dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never -> MS;
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS , MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- MS
- end};
+ send_or_record_confirm(Status, Delivery, MS, State1)};
{{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -735,52 +692,30 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
-
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+
+process_instruction({publish, false, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({publish, {true, AckRequired}, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {ok, maybe_store_ack(AckRequired, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, #delivery { message = #basic_message { id = MsgId } }},
- MQ2} ->
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, #delivery {}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -867,19 +802,16 @@ msg_ids_to_acktags(MsgIds, MA) ->
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {Acc, MAN};
- {ok, {_Num, AckTag}} -> {[AckTag | Acc],
- dict:erase(MsgId, MAN)}
+ error -> {Acc, MAN};
+ {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
-maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
- ack_num = Num }) ->
- State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
- ack_num = Num + 1 }.
+maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
+ State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index ae36febb33..870692282f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -14,7 +14,6 @@
%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-
-module(rabbit_mnesia).
-export([init/0,
@@ -26,18 +25,14 @@
forget_cluster_node/2,
status/0,
- is_db_empty/0,
is_clustered/0,
cluster_nodes/1,
node_type/0,
dir/0,
- table_names/0,
cluster_status_from_mnesia/0,
init_db_unchecked/2,
- empty_ram_only_tables/0,
copy_db/1,
- wait_for_tables/1,
check_cluster_consistency/0,
ensure_mnesia_dir/0,
@@ -51,10 +46,6 @@
is_running_remote/0
]).
-%% create_tables/0 exported for helping embed RabbitMQ in or alongside
-%% other mnesia-using Erlang applications, such as ejabberd
--export([create_tables/0]).
-
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
@@ -78,21 +69,16 @@
%% Various queries to get the status of the db
-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
{'running_nodes', [node()]}]).
--spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
--spec(table_names/0 :: () -> [atom()]).
-spec(cluster_status_from_mnesia/0 :: () -> rabbit_types:ok_or_error2(
cluster_status(), any())).
%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
-spec(init_db_unchecked/2 :: ([node()], node_type()) -> 'ok').
--spec(empty_ram_only_tables/0 :: () -> 'ok').
--spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
--spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
-spec(check_cluster_consistency/0 :: () -> 'ok').
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
@@ -290,6 +276,9 @@ forget_cluster_node(Node, RemoveWhenOffline) ->
end.
remove_node_offline_node(Node) ->
+ %% We want the running nodes *now*, so we don't call
+ %% `cluster_nodes(running)' which will just get what's in the cluster status
+ %% file.
case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of
{[], disc} ->
%% Note that while we check if the nodes was the last to
@@ -303,9 +292,13 @@ remove_node_offline_node(Node) ->
case cluster_nodes(running) -- [node(), Node] of
[] -> start_mnesia(),
try
- [mnesia:force_load_table(T) || T <- table_names()],
- forget_cluster_node(Node, false),
- ensure_mnesia_running()
+ %% What we want to do here is replace the last node to
+ %% go down with the current node. The way we do this
+ %% is by force loading the table, and making sure that
+ %% they are loaded.
+ rabbit_table:force_load(),
+ rabbit_table:wait_for_replicated(),
+ forget_cluster_node(Node, false)
after
stop_mnesia()
end;
@@ -331,10 +324,6 @@ status() ->
no -> []
end.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
-
is_clustered() -> AllNodes = cluster_nodes(all),
AllNodes =/= [] andalso AllNodes =/= [node()].
@@ -356,9 +345,7 @@ mnesia_nodes() ->
true -> disc;
false -> ram
end,
- Tables = mnesia:system_info(tables),
- [{Table, _} | _] = table_definitions(NodeType),
- case lists:member(Table, Tables) of
+ case rabbit_table:is_present() of
true -> AllNodes = mnesia:system_info(db_nodes),
DiscCopies = mnesia:table_info(schema, disc_copies),
DiscNodes = case NodeType of
@@ -413,8 +400,6 @@ node_type() ->
dir() -> mnesia:system_info(directory).
-table_names() -> [Tab || {Tab, _} <- table_definitions()].
-
%%----------------------------------------------------------------------------
%% Operations on the db
%%----------------------------------------------------------------------------
@@ -444,18 +429,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
%% Subsequent node in cluster, catch up
ensure_version_ok(
rpc:call(AnotherNode, rabbit_version, recorded, [])),
- ok = wait_for_replicated_tables(),
- %% The sequence in which we delete the schema and then the
- %% other tables is important: if we delete the schema
- %% first when moving to RAM mnesia will loudly complain
- %% since it doesn't make much sense to do that. But when
- %% moving to disc, we need to move the schema first.
- case NodeType of
- disc -> create_local_table_copy(schema, disc_copies),
- create_local_table_copies(disc);
- ram -> create_local_table_copies(ram),
- create_local_table_copy(schema, ram_copies)
- end
+ ok = rabbit_table:wait_for_replicated(),
+ ok = rabbit_table:create_local_copy(NodeType)
end,
ensure_schema_integrity(),
rabbit_node_monitor:update_cluster_status(),
@@ -476,7 +451,7 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
case NodeType of
ram -> start_mnesia(),
change_extra_db_nodes(ClusterNodes, false),
- wait_for_replicated_tables();
+ rabbit_table:wait_for_replicated();
disc -> ok
end,
ok.
@@ -522,70 +497,17 @@ ensure_mnesia_not_running() ->
end.
ensure_schema_integrity() ->
- case check_schema_integrity() of
+ case rabbit_table:check_schema_integrity() of
ok ->
ok;
{error, Reason} ->
throw({error, {schema_integrity_check_failed, Reason}})
end.
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(table_names()),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
-empty_ram_only_tables() ->
- Node = node(),
- lists:foreach(
- fun (TabName) ->
- case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
- true -> {atomic, ok} = mnesia:clear_table(TabName);
- false -> ok
- end
- end, table_names()),
- ok.
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
rabbit_file:recursive_copy(dir(), Destination).
-wait_for_replicated_tables() ->
- wait_for_tables([Tab || {Tab, TabDef} <- table_definitions(),
- not lists:member({local_content, true}, TabDef)]).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
- end.
-
%% This does not guarantee us much, but it avoids some situations that
%% will definitely end up badly
check_cluster_consistency() ->
@@ -678,158 +600,8 @@ discover_cluster(Node) ->
end
end.
-%% The tables aren't supposed to be on disk on a ram node
-table_definitions(disc) ->
- table_definitions();
-table_definitions(ram) ->
- [{Tab, copy_type_to_ram(TabDef)} || {Tab, TabDef} <- table_definitions()].
-
-table_definitions() ->
- [{rabbit_user,
- [{record_name, internal_user},
- {attributes, record_info(fields, internal_user)},
- {disc_copies, [node()]},
- {match, #internal_user{_='_'}}]},
- {rabbit_user_permission,
- [{record_name, user_permission},
- {attributes, record_info(fields, user_permission)},
- {disc_copies, [node()]},
- {match, #user_permission{user_vhost = #user_vhost{_='_'},
- permission = #permission{_='_'},
- _='_'}}]},
- {rabbit_vhost,
- [{record_name, vhost},
- {attributes, record_info(fields, vhost)},
- {disc_copies, [node()]},
- {match, #vhost{_='_'}}]},
- {rabbit_listener,
- [{record_name, listener},
- {attributes, record_info(fields, listener)},
- {type, bag},
- {match, #listener{_='_'}}]},
- {rabbit_durable_route,
- [{record_name, route},
- {attributes, record_info(fields, route)},
- {disc_copies, [node()]},
- {match, #route{binding = binding_match(), _='_'}}]},
- {rabbit_semi_durable_route,
- [{record_name, route},
- {attributes, record_info(fields, route)},
- {type, ordered_set},
- {match, #route{binding = binding_match(), _='_'}}]},
- {rabbit_route,
- [{record_name, route},
- {attributes, record_info(fields, route)},
- {type, ordered_set},
- {match, #route{binding = binding_match(), _='_'}}]},
- {rabbit_reverse_route,
- [{record_name, reverse_route},
- {attributes, record_info(fields, reverse_route)},
- {type, ordered_set},
- {match, #reverse_route{reverse_binding = reverse_binding_match(),
- _='_'}}]},
- {rabbit_topic_trie_node,
- [{record_name, topic_trie_node},
- {attributes, record_info(fields, topic_trie_node)},
- {type, ordered_set},
- {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
- {rabbit_topic_trie_edge,
- [{record_name, topic_trie_edge},
- {attributes, record_info(fields, topic_trie_edge)},
- {type, ordered_set},
- {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
- {rabbit_topic_trie_binding,
- [{record_name, topic_trie_binding},
- {attributes, record_info(fields, topic_trie_binding)},
- {type, ordered_set},
- {match, #topic_trie_binding{trie_binding = trie_binding_match(),
- _='_'}}]},
- {rabbit_durable_exchange,
- [{record_name, exchange},
- {attributes, record_info(fields, exchange)},
- {disc_copies, [node()]},
- {match, #exchange{name = exchange_name_match(), _='_'}}]},
- {rabbit_exchange,
- [{record_name, exchange},
- {attributes, record_info(fields, exchange)},
- {match, #exchange{name = exchange_name_match(), _='_'}}]},
- {rabbit_exchange_serial,
- [{record_name, exchange_serial},
- {attributes, record_info(fields, exchange_serial)},
- {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
- {rabbit_runtime_parameters,
- [{record_name, runtime_parameters},
- {attributes, record_info(fields, runtime_parameters)},
- {disc_copies, [node()]},
- {match, #runtime_parameters{_='_'}}]},
- {rabbit_durable_queue,
- [{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)},
- {disc_copies, [node()]},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]},
- {rabbit_queue,
- [{record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
- ++ gm:table_definitions()
- ++ mirrored_supervisor:table_definitions().
-
-binding_match() ->
- #binding{source = exchange_name_match(),
- destination = binding_destination_match(),
- _='_'}.
-reverse_binding_match() ->
- #reverse_binding{destination = binding_destination_match(),
- source = exchange_name_match(),
- _='_'}.
-binding_destination_match() ->
- resource_match('_').
-trie_node_match() ->
- #trie_node{ exchange_name = exchange_name_match(), _='_'}.
-trie_edge_match() ->
- #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
-trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(), _='_'}.
-exchange_name_match() ->
- resource_match(exchange).
-queue_name_match() ->
- resource_match(queue).
-resource_match(Kind) ->
- #resource{kind = Kind, _='_'}.
-
-check_table_attributes(Tab, TabDef) ->
- {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
- case mnesia:table_info(Tab, attributes) of
- ExpAttrs -> ok;
- Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
- end.
-
-check_table_content(Tab, TabDef) ->
- {_, Match} = proplists:lookup(match, TabDef),
- case mnesia:dirty_first(Tab) of
- '$end_of_table' ->
- ok;
- Key ->
- ObjList = mnesia:dirty_read(Tab, Key),
- MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
- case ets:match_spec_run(ObjList, MatchComp) of
- ObjList -> ok;
- _ -> {error, {table_content_invalid, Tab, Match, ObjList}}
- end
- end.
-
-check_tables(Fun) ->
- case [Error || {Tab, TabDef} <- table_definitions(node_type()),
- case Fun(Tab, TabDef) of
- ok -> Error = none, false;
- {error, Error} -> true
- end] of
- [] -> ok;
- Errors -> {error, Errors}
- end.
-
schema_ok_or_move() ->
- case check_schema_integrity() of
+ case rabbit_table:check_schema_integrity() of
ok ->
ok;
{error, Reason} ->
@@ -858,7 +630,7 @@ create_schema() ->
stop_mnesia(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
start_mnesia(),
- ok = create_tables(disc),
+ ok = rabbit_table:create(),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
@@ -883,47 +655,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_type_to_ram(TabDef) ->
- [{disc_copies, []}, {ram_copies, [node()]}
- | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
-
-table_has_copy_type(TabDef, DiscType) ->
- lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
-
-create_local_table_copies(Type) ->
- lists:foreach(
- fun ({Tab, TabDef}) ->
- HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
- HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
- LocalTab = proplists:get_bool(local_content, TabDef),
- StorageType =
- if
- Type =:= disc orelse LocalTab ->
- if
- HasDiscCopies -> disc_copies;
- HasDiscOnlyCopies -> disc_only_copies;
- true -> ram_copies
- end;
- Type =:= ram ->
- ram_copies
- end,
- ok = create_local_table_copy(Tab, StorageType)
- end,
- table_definitions(Type)),
- ok.
-
-create_local_table_copy(Tab, Type) ->
- StorageType = mnesia:table_info(Tab, storage_type),
- {atomic, ok} =
- if
- StorageType == unknown ->
- mnesia:add_table_copy(Tab, node(), Type);
- StorageType /= Type ->
- mnesia:change_table_copy_type(Tab, node(), Type);
- true -> {atomic, ok}
- end,
- ok.
-
remove_node_if_mnesia_running(Node) ->
case mnesia:system_info(is_running) of
yes ->
@@ -942,13 +673,12 @@ remove_node_if_mnesia_running(Node) ->
end.
leave_cluster() ->
- RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))),
- case not is_clustered() andalso RunningNodes =:= [] of
- true -> ok;
- false -> case lists:any(fun leave_cluster/1, RunningNodes) of
- true -> ok;
- false -> e(no_running_cluster_nodes)
- end
+ case nodes_excl_me(cluster_nodes(all)) of
+ [] -> ok;
+ AllNodes -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
end.
leave_cluster(Node) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 69480c9c11..f4c1f42b21 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -98,19 +98,17 @@ update_policies(VHost) ->
ok.
update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) ->
- NewPolicy = match(XName, Policies),
- case NewPolicy of
+ case match(XName, Policies) of
OldPolicy -> no_change;
- _ -> rabbit_exchange:update(
+ NewPolicy -> rabbit_exchange:update(
XName, fun(X1) -> X1#exchange{policy = NewPolicy} end),
{X, X#exchange{policy = NewPolicy}}
end.
update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
- NewPolicy = match(QName, Policies),
- case NewPolicy of
+ case match(QName, Policies) of
OldPolicy -> no_change;
- _ -> rabbit_amqqueue:update(
+ NewPolicy -> rabbit_amqqueue:update(
QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end),
{Q, Q#amqqueue{policy = NewPolicy}}
end.
@@ -131,12 +129,11 @@ match(Name, Policies) ->
matches(#resource{name = Name}, Policy) ->
match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]).
-sort_pred(A, B) ->
- pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
+sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0).
%%----------------------------------------------------------------------------
policy_validation() ->
[{<<"priority">>, fun rabbit_parameter_validation:number/2, optional},
- {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
- {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
+ {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory},
+ {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}].
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6d6c648acb..21f581548d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
+ gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
new file mode 100644
index 0000000000..fa1c5bbd01
--- /dev/null
+++ b/src/rabbit_table.erl
@@ -0,0 +1,311 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_table).
+
+-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
+ force_load/0, is_present/0, is_empty/0,
+ check_schema_integrity/0, clear_ram_only_tables/0]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(create/0 :: () -> 'ok').
+-spec(create_local_copy/1 :: ('disc' | 'ram') -> 'ok').
+-spec(wait_for_replicated/0 :: () -> 'ok').
+-spec(wait/1 :: ([atom()]) -> 'ok').
+-spec(force_load/0 :: () -> 'ok').
+-spec(is_present/0 :: () -> boolean()).
+-spec(is_empty/0 :: () -> boolean()).
+-spec(check_schema_integrity/0 :: () -> rabbit_types:ok_or_error(any())).
+-spec(clear_ram_only_tables/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Main interface
+%%----------------------------------------------------------------------------
+
+create() ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end, definitions()),
+ ok.
+
+%% The sequence in which we delete the schema and then the other
+%% tables is important: if we delete the schema first when moving to
+%% RAM mnesia will loudly complain since it doesn't make much sense to
+%% do that. But when moving to disc, we need to move the schema first.
+create_local_copy(disc) ->
+ create_local_copy(schema, disc_copies),
+ create_local_copies(disc);
+create_local_copy(ram) ->
+ create_local_copies(ram),
+ create_local_copy(schema, ram_copies).
+
+wait_for_replicated() ->
+ wait([Tab || {Tab, TabDef} <- definitions(),
+ not lists:member({local_content, true}, TabDef)]).
+
+wait(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+force_load() -> [mnesia:force_load_table(T) || T <- names()], ok.
+
+is_present() -> names() -- mnesia:system_info(tables) =:= [].
+
+is_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ names()).
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait(names()),
+ check(fun check_content/2);
+ Other -> Other
+ end.
+
+clear_ram_only_tables() ->
+ Node = node(),
+ lists:foreach(
+ fun (TabName) ->
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
+ end, names()),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
+%%--------------------------------------------------------------------
+
+create_local_copies(Type) ->
+ lists:foreach(
+ fun ({Tab, TabDef}) ->
+ HasDiscCopies = has_copy_type(TabDef, disc_copies),
+ HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies),
+ LocalTab = proplists:get_bool(local_content, TabDef),
+ StorageType =
+ if
+ Type =:= disc orelse LocalTab ->
+ if
+ HasDiscCopies -> disc_copies;
+ HasDiscOnlyCopies -> disc_only_copies;
+ true -> ram_copies
+ end;
+ Type =:= ram ->
+ ram_copies
+ end,
+ ok = create_local_copy(Tab, StorageType)
+ end, definitions(Type)),
+ ok.
+
+create_local_copy(Tab, Type) ->
+ StorageType = mnesia:table_info(Tab, storage_type),
+ {atomic, ok} =
+ if
+ StorageType == unknown ->
+ mnesia:add_table_copy(Tab, node(), Type);
+ StorageType /= Type ->
+ mnesia:change_table_copy_type(Tab, node(), Type);
+ true -> {atomic, ok}
+ end,
+ ok.
+
+has_copy_type(TabDef, DiscType) ->
+ lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
+
+check_attributes(Tab, TabDef) ->
+ {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
+ case mnesia:table_info(Tab, attributes) of
+ ExpAttrs -> ok;
+ Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
+ end.
+
+check_content(Tab, TabDef) ->
+ {_, Match} = proplists:lookup(match, TabDef),
+ case mnesia:dirty_first(Tab) of
+ '$end_of_table' ->
+ ok;
+ Key ->
+ ObjList = mnesia:dirty_read(Tab, Key),
+ MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
+ case ets:match_spec_run(ObjList, MatchComp) of
+ ObjList -> ok;
+ _ -> {error, {table_content_invalid, Tab, Match, ObjList}}
+ end
+ end.
+
+check(Fun) ->
+ case [Error || {Tab, TabDef} <- definitions(),
+ case Fun(Tab, TabDef) of
+ ok -> Error = none, false;
+ {error, Error} -> true
+ end] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
+%%--------------------------------------------------------------------
+%% Table definitions
+%%--------------------------------------------------------------------
+
+names() -> [Tab || {Tab, _} <- definitions()].
+
+%% The tables aren't supposed to be on disk on a ram node
+definitions(disc) ->
+ definitions();
+definitions(ram) ->
+ [{Tab, [{disc_copies, []}, {ram_copies, [node()]} |
+ proplists:delete(
+ ram_copies, proplists:delete(disc_copies, TabDef))]} ||
+ {Tab, TabDef} <- definitions()].
+
+definitions() ->
+ [{rabbit_user,
+ [{record_name, internal_user},
+ {attributes, record_info(fields, internal_user)},
+ {disc_copies, [node()]},
+ {match, #internal_user{_='_'}}]},
+ {rabbit_user_permission,
+ [{record_name, user_permission},
+ {attributes, record_info(fields, user_permission)},
+ {disc_copies, [node()]},
+ {match, #user_permission{user_vhost = #user_vhost{_='_'},
+ permission = #permission{_='_'},
+ _='_'}}]},
+ {rabbit_vhost,
+ [{record_name, vhost},
+ {attributes, record_info(fields, vhost)},
+ {disc_copies, [node()]},
+ {match, #vhost{_='_'}}]},
+ {rabbit_listener,
+ [{record_name, listener},
+ {attributes, record_info(fields, listener)},
+ {type, bag},
+ {match, #listener{_='_'}}]},
+ {rabbit_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {disc_copies, [node()]},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_semi_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set},
+ {match, #route{binding = binding_match(), _='_'}}]},
+ {rabbit_reverse_route,
+ [{record_name, reverse_route},
+ {attributes, record_info(fields, reverse_route)},
+ {type, ordered_set},
+ {match, #reverse_route{reverse_binding = reverse_binding_match(),
+ _='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
+ {rabbit_durable_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {disc_copies, [node()]},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {match, #exchange{name = exchange_name_match(), _='_'}}]},
+ {rabbit_exchange_serial,
+ [{record_name, exchange_serial},
+ {attributes, record_info(fields, exchange_serial)},
+ {match, #exchange_serial{name = exchange_name_match(), _='_'}}]},
+ {rabbit_runtime_parameters,
+ [{record_name, runtime_parameters},
+ {attributes, record_info(fields, runtime_parameters)},
+ {disc_copies, [node()]},
+ {match, #runtime_parameters{_='_'}}]},
+ {rabbit_durable_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {disc_copies, [node()]},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]},
+ {rabbit_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions()
+ ++ mirrored_supervisor:table_definitions().
+
+binding_match() ->
+ #binding{source = exchange_name_match(),
+ destination = binding_destination_match(),
+ _='_'}.
+reverse_binding_match() ->
+ #reverse_binding{destination = binding_destination_match(),
+ source = exchange_name_match(),
+ _='_'}.
+binding_destination_match() ->
+ resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
+trie_edge_match() ->
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
+trie_binding_match() ->
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
+exchange_name_match() ->
+ resource_match(exchange).
+queue_name_match() ->
+ resource_match(queue).
+resource_match(Kind) ->
+ #resource{kind = Kind, _='_'}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df0ee72147..aa48f22833 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,6 +54,7 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
+ passed = test_dynamic_mirroring(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
@@ -882,6 +883,52 @@ test_arguments_parser() ->
passed.
+test_dynamic_mirroring() ->
+ %% Just unit tests of the node selection logic, see multi node
+ %% tests for the rest...
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ {NewM, NewSs0} =
+ rabbit_mirror_queue_misc:suggested_queue_nodes(
+ Policy, Params, {OldM, OldSs}, All),
+ NewSs1 = lists:sort(NewSs0),
+ case dm_list_match(NewSs, NewSs1, ExtraSs) of
+ ok -> ok;
+ error -> exit({no_match, NewSs, NewSs1, ExtraSs})
+ end
+ end,
+
+ Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+
+ %% Add a node
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ %% Add two nodes and drop one
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ %% Promote slave to master by policy
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
+ %% Don't try to include nodes that are not running
+ Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ %% If we can't find any of the nodes listed then just keep the master
+ Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+
+ passed.
+
+%% Does the first list match the second where the second is required
+%% to have exactly Extra superfluous items?
+dm_list_match([], [], 0) -> ok;
+dm_list_match(_, [], _Extra) -> error;
+dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra);
+dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1).
+
test_user_management() ->
%% lots if stuff that should fail
@@ -1053,8 +1100,8 @@ test_server_status() ->
ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]),
%% eval
- {parse_error, _} = control_action(eval, ["\""]),
- {parse_error, _} = control_action(eval, ["a("]),
+ {error_string, _} = control_action(eval, ["\""]),
+ {error_string, _} = control_action(eval, ["a("]),
ok = control_action(eval, ["a."]),
%% cleanup
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index f488afb48a..5bc3d9f530 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -117,8 +117,7 @@
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
pid :: rabbit_types:maybe(pid()),
- slave_pids :: [pid()],
- mirror_nodes :: [node()] | 'undefined' | 'all'}).
+ slave_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index d037f954b4..455134da3f 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -201,7 +201,7 @@ primary_upgrade(Upgrades, Nodes) ->
mnesia,
Upgrades,
fun () ->
- force_tables(),
+ rabbit_table:force_load(),
case Others of
[] -> ok;
_ -> info("mnesia upgrades: Breaking cluster~n", []),
@@ -211,9 +211,6 @@ primary_upgrade(Upgrades, Nodes) ->
end),
ok.
-force_tables() ->
- [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()].
-
secondary_upgrade(AllNodes) ->
%% must do this before we wipe out schema
NodeType = node_type_legacy(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 47b22b98f6..21fdcd667b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -41,6 +41,8 @@
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
+-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
+-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
%% -------------------------------------------------------------------
@@ -64,6 +66,8 @@
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
+-spec(no_mirror_nodes/0 :: () -> 'ok').
+-spec(gm_pids/0 :: () -> 'ok').
-endif.
@@ -254,16 +258,41 @@ sync_slave_pids() ->
|| T <- Tables],
ok.
+no_mirror_nodes() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ RemoveMirrorNodesFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}
+ end,
+ [ok = transform(T, RemoveMirrorNodesFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy])
+ || T <- Tables],
+ ok.
+
+gm_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddGMPidsFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []}
+ end,
+ [ok = transform(T, AddGMPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy, gm_pids])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
- rabbit_mnesia:wait_for_tables([TableName]),
+ rabbit_table:wait([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
transform(TableName, Fun, FieldList, NewRecordName) ->
- rabbit_mnesia:wait_for_tables([TableName]),
+ rabbit_table:wait([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 68c659dfbe..ddb136a73d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -874,7 +874,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties{delivered = Delivered}) ->
+ MsgProps = #message_properties { delivered = Delivered }) ->
%% TODO would it make sense to remove #msg_status.is_delivered?
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
is_persistent = IsPersistent, is_delivered = Delivered,
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
new file mode 100644
index 0000000000..53f3df18b3
--- /dev/null
+++ b/src/rabbit_vm.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_vm).
+
+-export([memory/0]).
+
+-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
+ "rfc4627_jsonrpc"]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(memory/0 :: () -> rabbit_types:infos()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Like erlang:memory(), but with awareness of rabbit-y things
+memory() ->
+ Conns = (sup_memory(rabbit_tcp_client_sup) +
+ sup_memory(ssl_connection_sup) +
+ sup_memory(amqp_sup)),
+ Qs = (sup_memory(rabbit_amqqueue_sup) +
+ sup_memory(rabbit_mirror_queue_slave_sup)),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
+ MsgIndexProc = (pid_memory(msg_store_transient) +
+ pid_memory(msg_store_persistent)),
+ MgmtDbETS = ets_memory(rabbit_mgmt_db),
+ MgmtDbProc = sup_memory(rabbit_mgmt_sup),
+ Plugins = plugin_memory() - MgmtDbProc,
+
+ [{total, Total},
+ {processes, Processes},
+ {ets, ETS},
+ {atom, Atom},
+ {binary, Bin},
+ {code, Code},
+ {system, System}] =
+ erlang:memory([total, processes, ets, atom, binary, code, system]),
+
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+
+ [{total, Total},
+ {connection_procs, Conns},
+ {queue_procs, Qs},
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {mnesia, Mnesia},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+ {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+ {binary, Bin},
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Atom - Bin - Code}].
+
+%% [1] - erlang:memory(processes) can be less than the sum of its
+%% parts. Rather than display something nonsensical, just silence any
+%% claims about negative memory. See
+%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+
+%%----------------------------------------------------------------------------
+
+sup_memory(Sup) ->
+ lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
+ pid_memory(Sup).
+
+sup_children(Sup) ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+
+pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
+ {memory, M} -> M;
+ _ -> 0
+ end;
+pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
+ P when is_pid(P) -> pid_memory(P);
+ _ -> 0
+ end.
+
+child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
+child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
+child_memory(_, _) -> 0.
+
+mnesia_memory() ->
+ case mnesia:system_info(is_running) of
+ yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
+ Tab <- mnesia:system_info(tables)]);
+ no -> 0
+ end.
+
+ets_memory(Name) ->
+ lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(),
+ N <- [ets:info(T, name)],
+ N =:= Name]).
+
+bytes(Words) -> Words * erlang:system_info(wordsize).
+
+plugin_memory() ->
+ lists:sum([plugin_memory(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
+
+plugin_memory(App) ->
+ case catch application_master:get_child(
+ application_controller:get_master(App)) of
+ {Pid, _} -> sup_memory(Pid);
+ _ -> 0
+ end.
+
+is_plugin("rabbitmq_" ++ _) -> true;
+is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).