summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl21
-rw-r--r--src/delegate_sup.erl26
-rw-r--r--src/gen_server2.erl25
-rw-r--r--src/rabbit.erl40
-rw-r--r--src/rabbit_amqqueue.erl77
-rw-r--r--src/rabbit_amqqueue_process.erl223
-rw-r--r--src/rabbit_auth_mechanism_plain.erl35
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl135
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_client_sup.erl (renamed from src/tcp_client_sup.erl)22
-rw-r--r--src/rabbit_control.erl31
-rw-r--r--src/rabbit_direct.erl75
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl136
-rw-r--r--src/rabbit_mirror_queue_master.erl250
-rw-r--r--src/rabbit_mirror_queue_misc.erl46
-rw-r--r--src/rabbit_mirror_queue_slave.erl529
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl54
-rw-r--r--src/rabbit_misc.erl50
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_msg_store.erl103
-rw-r--r--src/rabbit_multi.erl2
-rw-r--r--src/rabbit_networking.erl246
-rw-r--r--src/rabbit_node_monitor.erl53
-rw-r--r--src/rabbit_reader.erl29
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl97
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl158
-rw-r--r--src/supervisor2.erl6
-rw-r--r--src/tcp_acceptor.erl4
-rw-r--r--src/tcp_listener.erl9
-rw-r--r--src/test_sup.erl14
34 files changed, 2076 insertions, 469 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index ff55a15b40..17046201ad 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]).
+-export([start_link/1, invoke_no_result/2, invoke/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -36,8 +36,6 @@
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
--spec(delegate_count/0 :: () -> non_neg_integer()).
-
-endif.
%%----------------------------------------------------------------------------
@@ -68,9 +66,9 @@ invoke(Pids, Fun) when is_list(Pids) ->
{Replies, BadNodes} =
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
- RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(),
- {invoke, Fun, Grouped},
- infinity)
+ RemoteNodes -> gen_server2:multi_call(
+ RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
@@ -92,7 +90,7 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(),
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
@@ -111,17 +109,14 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], orddict:new()}, Pids).
-delegate_count() ->
- {ok, Count} = application:get_env(rabbit, delegate_count),
- Count.
-
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate() ->
+delegate(RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(self(), delegate_count())),
+ erlang:phash2(self(),
+ delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Name -> Name
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 5274722145..fc693c7d3d 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0]).
+-export([start_link/1, count/1]).
-export([init/1]).
@@ -28,20 +28,32 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(count/1 :: ([node()]) -> integer()).
-endif.
%%----------------------------------------------------------------------------
-start_link() ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, []).
+start_link(Count) ->
+ supervisor:start_link({local, ?SERVER}, ?MODULE, [Count]).
+
+count([]) ->
+ 1;
+count([Node | Nodes]) ->
+ try
+ length(supervisor:which_children({?SERVER, Node}))
+ catch exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
+ count(Nodes);
+ exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown;
+ R =:= nodedown ->
+ count(Nodes)
+ end.
%%----------------------------------------------------------------------------
-init(_Args) ->
- DCount = delegate:delegate_count(),
+init([Count]) ->
{ok, {{one_for_one, 10, 10},
[{Num, {delegate, start_link, [Num]},
transient, 16#ffffffff, worker, [delegate]} ||
- Num <- lists:seq(0, DCount - 1)]}}.
+ Num <- lists:seq(0, Count - 1)]}}.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index a637ddddc8..94296f9751 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -58,6 +58,15 @@
%% hibernate the process immediately, as it would if backoff wasn't
%% being used. Instead it'll wait for the current timeout as described
%% above.
+%%
+%% 7) The callback module can return from any of the handle_*
+%% functions, a {become, Module, State} triple, or a {become, Module,
+%% State, Timeout} quadruple. This allows the gen_server to
+%% dynamically change the callback module. The State is the new state
+%% which will be passed into any of the callback functions in the new
+%% module. Note there is no form also encompassing a reply, thus if
+%% you wish to reply in handle_call/3 and change the callback module,
+%% you need to use gen_server2:reply/2 to issue the reply manually.
%% All modifications are (C) 2009-2011 VMware, Inc.
@@ -880,6 +889,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name,
loop(GS2State #gs2_state { state = NState,
time = Time1,
debug = Debug1 });
+ {become, Mod, NState} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = infinity,
+ debug = Debug1 }));
+ {become, Mod, NState, Time1} ->
+ Debug1 = common_debug(Debug, fun print_event/3, Name,
+ {become, Mod, NState}),
+ loop(find_prioritisers(
+ GS2State #gs2_state { mod = Mod,
+ state = NState,
+ time = Time1,
+ debug = Debug1 }));
_ ->
handle_common_termination(Reply, Msg, GS2State)
end.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b041a6372c..d967cfb9cd 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,7 +27,7 @@
%%---------------------------------------------------------------------------
%% Boot steps.
--export([maybe_insert_default_data/0]).
+-export([maybe_insert_default_data/0, boot_delegate/0]).
-rabbit_boot_step({codec_correctness_check,
[{description, "codec correctness check"},
@@ -36,6 +36,12 @@
[]}},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_registry]}},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{enables, external_infrastructure}]}).
@@ -54,13 +60,6 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_registry,
- [{description, "plugin registry"},
- {mfa, {rabbit_sup, start_child,
- [rabbit_registry]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -101,8 +100,7 @@
-rabbit_boot_step({delegate_sup,
[{description, "cluster delegate"},
- {mfa, {rabbit_sup, start_child,
- [delegate_sup]}},
+ {mfa, {rabbit, boot_delegate, []}},
{requires, kernel_ready},
{enables, core_initialized}]}).
@@ -145,13 +143,18 @@
{requires, routing_ready},
{enables, networking}]}).
+-rabbit_boot_step({direct_client,
+ [{mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
+
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
- {requires, log_relay},
- {enables, networking_listening}]}).
+ {requires, log_relay}]}).
--rabbit_boot_step({networking_listening,
- [{description, "network listeners available"}]}).
+-rabbit_boot_step({notify_cluster,
+ [{description, "notify cluster nodes"},
+ {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -179,6 +182,9 @@
{running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
+-spec(maybe_insert_default_data/0 :: () -> 'ok').
+-spec(boot_delegate/0 :: () -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -225,11 +231,11 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, SupPid} = rabbit_sup:start_link(),
+ true = register(rabbit, self()),
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
-
{ok, SupPid};
Error ->
Error
@@ -448,6 +454,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end
end.
+boot_delegate() ->
+ {ok, Count} = application:get_env(rabbit, delegate_count),
+ rabbit_sup:start_child(delegate_sup, [Count]).
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ad9e3ce609..4ef9750c82 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,8 +18,8 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
+ maybe_run_queue_via_backing_queue/3,
+ maybe_run_queue_via_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
@@ -33,6 +33,7 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -137,11 +138,13 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit()).
--spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+ rabbit_types:connection_exit() |
+ fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
+-spec(maybe_run_queue_via_backing_queue/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -189,12 +192,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -215,8 +219,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[_] -> %% Q exists on stopped node
rabbit_misc:const(not_found)
end;
- [ExistingQ] ->
- rabbit_misc:const(ExistingQ)
+ [ExistingQ = #amqqueue{pid = QPid}] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> rabbit_misc:const(ExistingQ);
+ false -> TailFun = internal_delete(QueueName),
+ fun (Tx) -> TailFun(Tx), ExistingQ end
+ end
end
end).
@@ -350,7 +358,8 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) ->
+ delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -415,7 +424,7 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- delegate_cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -432,24 +441,24 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
+ [] -> rabbit_misc:const({error, not_found});
+ [_] -> Deletions = internal_delete1(QueueName),
+ fun (Tx) -> ok = rabbit_binding:process_deletions(
+ Deletions, Tx)
+ end
end
- end,
- fun ({error, _} = Err, _Tx) ->
- Err;
- (Deletions, Tx) ->
- ok = rabbit_binding:process_deletions(Deletions, Tx)
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+maybe_run_queue_via_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun},
+ infinity).
+
+maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
@@ -472,7 +481,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end,
@@ -489,11 +499,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 663977ba87..0b5ad05939 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,6 +33,8 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
+-export([init_with_backing_queue_state/6]).
+
% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -72,7 +74,8 @@
messages,
consumers,
memory,
- backing_queue_status
+ backing_queue_status,
+ mirror_pids
]).
-define(CREATION_EVENT_KEYS,
@@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,6 +117,36 @@ init(Q) ->
guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+ RateTRef, AckTags, Deliveries) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Owner of
+ none -> ok;
+ _ -> erlang:monitor(process, Owner)
+ end,
+ State = requeue_and_run(
+ AckTags,
+ process_args(
+ #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()})),
+ lists:foldl(
+ fun (Delivery, StateN) ->
+ {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN),
+ StateN1
+ end, State, Deliveries).
+
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
@@ -135,8 +167,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -147,7 +178,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = BQ:init(Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -209,6 +240,13 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
+backing_queue_module(#amqqueue{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ _Nodes -> rabbit_mirror_queue_master
+ end.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
@@ -332,11 +370,6 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
@@ -482,7 +515,7 @@ attempt_delivery(#delivery{txn = none,
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
needs_confirming = (NeedsConfirming =:= confirm)},
- BQS),
+ ChPid, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
@@ -495,11 +528,12 @@ attempt_delivery(#delivery{txn = Txn,
{NeedsConfirming,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- record_current_channel_tx(ChPid, Txn),
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
{true,
NeedsConfirming,
State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -512,15 +546,17 @@ deliver_or_enqueue(Delivery, State) ->
(message_properties(State)) #message_properties{
needs_confirming =
(NeedsConfirming =:= confirm)},
- BQS),
+ Delivery #delivery.sender, BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ BQ, fun (BQS) ->
+ {_Guids, BQS1} =
+ BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ {[], BQS1}
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -591,7 +627,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, ChPid,
+ _ -> rollback_transaction(Txn, C,
State1)
end,
{ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -622,31 +658,34 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
+backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
+ maybe_run_queue_via_backing_queue(
+ BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+
+maybe_run_queue_via_backing_queue(Mod, Fun,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
run_message_queue(
confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL),
- BQS),
- %% ChPid must be known here because of the participant management
- %% by the channel.
- C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
-rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
%% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here (would also require ChPid)
- record_current_channel_tx(ChPid, none),
+ %% we would add them back in here.
+ maybe_store_ch_record(C#cr{txn = none}),
State#q{backing_queue_state = BQS1}.
subtract_acks(A, B) when is_list(B) ->
@@ -724,6 +763,9 @@ i(memory, _) ->
M;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
+i(mirror_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name),
+ MPids;
i(Item, _) ->
throw({bad_argument, Item}).
@@ -759,29 +801,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -794,20 +836,20 @@ handle_call({init, Recover}, From,
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
- case rpc:call(node(Owner), erlang, is_process_alive, [Owner]) of
- true -> erlang:monitor(process, Owner),
- declare(Recover, From, State);
- _ -> #q{q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined} = State,
- gen_server2:reply(From, not_found),
- case Recover of
- true -> ok;
- _ -> rabbit_log:warning(
- "Queue ~p exclusive owner went away~n", [QName])
- end,
- BQS = BQ:init(QName, IsDurable, Recover),
- %% Rely on terminate to delete the queue.
- {stop, normal, State#q{backing_queue_state = BQS}}
+ case rabbit_misc:is_process_alive(Owner) of
+ true -> erlang:monitor(process, Owner),
+ declare(Recover, From, State);
+ false -> #q{backing_queue = BQ, backing_queue_state = undefined,
+ q = #amqqueue{name = QName, durable = IsDurable}} = State,
+ gen_server2:reply(From, not_found),
+ case Recover of
+ true -> ok;
+ _ -> rabbit_log:warning(
+ "Queue ~p exclusive owner went away~n", [QName])
+ end,
+ BQS = BQ:init(QName, IsDurable, Recover),
+ %% Rely on terminate to delete the queue.
+ {stop, normal, State#q{backing_queue_state = BQS}}
end;
handle_call(info, _From, State) ->
@@ -848,8 +890,11 @@ handle_call({deliver, Delivery}, From, State) ->
noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
- NewState = commit_transaction(Txn, From, ChPid, State),
- noreply(run_message_queue(NewState));
+ case lookup_ch(ChPid) of
+ not_found -> reply(ok, State);
+ C -> noreply(run_message_queue(
+ commit_transaction(Txn, From, C, State)))
+ end;
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -994,17 +1039,15 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
-handle_cast(sync_timeout, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- noreply(State#q{backing_queue_state = BQ:idle_timeout(BQS),
- sync_timer_ref = undefined});
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -1021,7 +1064,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- BQS1 = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1042,13 +1085,16 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(rollback_transaction(Txn, ChPid, State));
+ noreply(case lookup_ch(ChPid) of
+ not_found -> State;
+ C -> rollback_transaction(Txn, C, State)
+ end);
handle_cast(delete_immediately, State) ->
{stop, normal, State};
@@ -1135,9 +1181,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{stop, NewState} -> {stop, normal, NewState}
end;
-handle_info(timeout, State = #q{backing_queue = BQ}) ->
- noreply(maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State));
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -1151,15 +1196,15 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
stats_timer = StatsTimer}) ->
- BQS1 = BQ:handle_pre_hibernate(BQS),
- %% no activity for a while == 0 egress and ingress rates
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
- rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(StatsTimer,
fun () ->
emit_stats(State, [{idle_since, now()}])
end),
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS2},
+ backing_queue_state = BQS3},
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 7d9dcd2007..1ca07018e4 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -33,6 +33,10 @@
%% SASL PLAIN, as used by the Qpid Java client and our clients. Also,
%% apparently, by OpenAMQ.
+%% TODO: once the minimum erlang becomes R13B03, reimplement this
+%% using the binary module - that makes use of BIFs to do binary
+%% matching and will thus be much faster.
+
description() ->
[{name, <<"PLAIN">>},
{description, <<"SASL PLAIN authentication mechanism">>}].
@@ -41,11 +45,32 @@ init(_Sock) ->
[].
handle_response(Response, _State) ->
- %% The '%%"' at the end of the next line is for Emacs
- case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
- [{capture, all_but_first, binary}]) of
- {match, [User, Pass]} ->
+ case extract_user_pass(Response) of
+ {ok, User, Pass} ->
rabbit_access_control:check_user_pass_login(User, Pass);
- _ ->
+ error ->
{protocol_error, "response ~p invalid", [Response]}
end.
+
+extract_user_pass(Response) ->
+ case extract_elem(Response) of
+ {ok, User, Response1} -> case extract_elem(Response1) of
+ {ok, Pass, <<>>} -> {ok, User, Pass};
+ _ -> error
+ end;
+ error -> error
+ end.
+
+extract_elem(<<0:8, Rest/binary>>) ->
+ Count = next_null_pos(Rest),
+ <<Elem:Count/binary, Rest1/binary>> = Rest,
+ {ok, Elem, Rest1};
+extract_elem(_) ->
+ error.
+
+next_null_pos(Bin) ->
+ next_null_pos(Bin, 0).
+
+next_null_pos(<<>>, Count) -> Count;
+next_null_pos(<<0:8, _Rest/binary>>, Count) -> Count;
+next_null_pos(<<_:8, Rest/binary>>, Count) -> next_null_pos(Rest, Count + 1).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6a21e10fd3..1aa6ea6745 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,7 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ {init, 2},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -47,12 +47,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
@@ -66,7 +66,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -122,7 +122,12 @@ behaviour_info(callbacks) ->
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
- {status, 1}
+ {status, 1},
+
+ %% Passed a function to be invoked with the relevant backing
+ %% queue's state. Useful for when the backing queue or other
+ %% components need to pass functions into the backing queue.
+ {invoke, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index b270927be3..96a22dcaf1 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -331,7 +331,7 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case mnesia:read(rabbit_exchange, XName) of
+ case mnesia:read({rabbit_exchange, XName}) of
[] ->
add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
[X] ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b92206ad4e..a82e5eff3e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,8 +41,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ messages_unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -275,17 +277,20 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info({'DOWN', _MRef, process, QPid, _Reason},
+handle_info({'DOWN', _MRef, process, QPid, Reason},
State = #ch{unconfirmed = UC}) ->
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MsgSeqNos, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
+ {MXs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}, State),
erase_queue_stats(QPid),
- noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
- State#ch{unconfirmed = UC1}))).
+ State1 = case Reason of
+ normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
+ _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
+ end,
+ noreply(queue_blocked(QPid, State1)).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -471,38 +476,44 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc) ->
+remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
+ State).
-record_confirm(undefined, State) -> State;
-record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State).
+record_confirm(undefined, _, State) ->
+ State;
+record_confirm(MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State).
record_confirms([], State) ->
State;
-record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
- State#ch{confirmed = [MsgSeqNos | C]}.
+record_confirms(MXs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MXs | C]}.
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {DoneMessages, UC2} =
+ {MXs, UC1} =
lists:foldl(
fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
- {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
+ {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
end
end, {[], UC}, MsgSeqNos),
- record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ %% these confirms will be emitted even when a queue dies, but that
+ %% should be fine, since the queue stats get erased immediately
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -555,7 +566,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -729,16 +740,22 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ limiter_pid = LimiterPid}) ->
+ OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
%% The Qpid python test suite incorrectly assumes
%% that messages will be requeued in their original
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
- rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), self())
+ rabbit_misc:with_exit_handler(
+ OkFun, fun () ->
+ rabbit_amqqueue:requeue(
+ QPid, lists:reverse(MsgIds), self())
+ end)
end, ok, UAMQ),
+ ok = notify_limiter(LimiterPid, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1222,20 +1239,20 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- record_confirm(MsgSeqNo, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1243,34 +1260,51 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-send_confirms(State = #ch{confirmed = C}) ->
- send_confirms(lists:append(C), State #ch{confirmed = []}).
+send_nacks([], State) ->
+ State;
+send_nacks(MXs, State) ->
+ MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
+ coalesce_and_send(MsgSeqNos,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+send_confirms(State = #ch{confirmed = C}) ->
+ C1 = lists:append(C),
+ MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ MsgSeqNo
+ end || {MsgSeqNo, ExchangeName} <- C1 ],
+ send_confirms(MsgSeqNos, State #ch{confirmed = []}).
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- send_confirm(MsgSeqNo, WriterPid),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = MsgSeqNo}),
State;
-send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
- SCs = lists:usort(Cs),
+send_confirms(Cs, State) ->
+ coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+ #'basic.ack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
+coalesce_and_send(MsgSeqNos, MkMsgFun,
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case gb_trees:is_empty(UC) of
- true -> lists:last(SCs) + 1;
- false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
+ true -> lists:last(SMsgSeqNos) + 1;
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
_ -> ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
- multiple = true})
+ WriterPid, MkMsgFun(lists:last(Ms), true))
end,
- [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ [ok = rabbit_writer:send_command(
+ WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-send_confirm(SeqNo, WriterPid) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = SeqNo}).
-
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
@@ -1283,8 +1317,11 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d426d55df5..d21cfdb7fb 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -31,9 +31,11 @@
-export_type([start_link_args/0]).
-type(start_link_args() ::
- {rabbit_types:protocol(), rabbit_net:socket(),
+ {'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()}).
+ rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -41,7 +43,7 @@
%%----------------------------------------------------------------------------
-start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
+start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
@@ -58,7 +60,17 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
- {ok, SupPid, {ChannelPid, AState}}.
+ {ok, SupPid, {ChannelPid, AState}};
+start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, ChannelPid} =
+ supervisor2:start_child(
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid,
+ User, VHost, Collector, start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ {ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_client_sup.erl b/src/rabbit_client_sup.erl
index 1c2bbb6548..dbdc6cd429 100644
--- a/src/tcp_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -14,7 +14,7 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--module(tcp_client_sup).
+-module(rabbit_client_sup).
-behaviour(supervisor2).
@@ -22,6 +22,21 @@
-export([init/1]).
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+ rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Callback) ->
supervisor2:start_link(?MODULE, Callback).
@@ -29,6 +44,5 @@ start_link(SupName, Callback) ->
supervisor2:start_link(SupName, ?MODULE, Callback).
init({M,F,A}) ->
- {ok, {{simple_one_for_one_terminate, 10, 10},
- [{tcp_client, {M,F,A},
- temporary, infinity, supervisor, [M]}]}}.
+ {ok, {{simple_one_for_one_terminate, 0, 1},
+ [{client, {M,F,A}, temporary, infinity, supervisor, [M]}]}}.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 4228ff7fd3..15f1e77d77 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -77,24 +77,24 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- halt();
+ quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
[string:join([atom_to_list(Command) | Args], " ")]),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, Reason} ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
- halt(2);
+ quit(2);
Other ->
print_error("~p", [Other]),
- halt(2)
+ quit(2)
end.
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
@@ -130,7 +130,7 @@ stop() ->
usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
- halt(1).
+ quit(1).
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
@@ -327,11 +327,11 @@ format_info_item(#resource{name = Name}) ->
escape(Name);
format_info_item({N1, N2, N3, N4} = Value) when
?IS_U8(N1), ?IS_U8(N2), ?IS_U8(N3), ?IS_U8(N4) ->
- inet_parse:ntoa(Value);
+ rabbit_misc:ntoa(Value);
format_info_item({K1, K2, K3, K4, K5, K6, K7, K8} = Value) when
?IS_U16(K1), ?IS_U16(K2), ?IS_U16(K3), ?IS_U16(K4),
?IS_U16(K5), ?IS_U16(K6), ?IS_U16(K7), ?IS_U16(K8) ->
- inet_parse:ntoa(Value);
+ rabbit_misc:ntoa(Value);
format_info_item(Value) when is_pid(Value) ->
rabbit_misc:pid_to_string(Value);
format_info_item(Value) when is_binary(Value) ->
@@ -342,6 +342,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
@@ -393,3 +399,12 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
+
+% the slower shutdown on windows required to flush stdout
+quit(Status) ->
+ case os:type() of
+ {unix, _} ->
+ halt(Status);
+ {win32, _} ->
+ init:stop(Status)
+ end.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
new file mode 100644
index 0000000000..3b8c9fba39
--- /dev/null
+++ b/src/rabbit_direct.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_direct).
+
+-export([boot/0, connect/3, start_channel/5]).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(connect/3 :: (binary(), binary(), binary()) ->
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
+-spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
+ rabbit_types:user(), rabbit_types:vhost(), pid()) ->
+ {'ok', pid()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+boot() ->
+ {ok, _} =
+ supervisor2:start_child(
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+connect(Username, Password, VHost) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true ->
+ try rabbit_access_control:user_pass_login(Username, Password) of
+ #user{} = User ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> {ok, {User, rabbit_reader:server_properties()}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end
+ catch
+ exit:#amqp_error{name = access_refused} -> {error, auth_failure}
+ end;
+ false ->
+ {error, broker_not_found_on_node}
+ end.
+
+start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+ {ok, _, {ChannelPid, _}} =
+ supervisor2:start_child(
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ {ok, ChannelPid}.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
new file mode 100644
index 0000000000..30fd6ed34d
--- /dev/null
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -0,0 +1,136 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_coordinator).
+
+-export([start_link/2, add_slave/2, get_gm/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm
+ }).
+
+-define(ONE_SECOND, 1000).
+
+start_link(Queue, GM) ->
+ gen_server2:start_link(?MODULE, [Queue, GM], []).
+
+add_slave(CPid, SlaveNode) ->
+ gen_server2:cast(CPid, {add_slave, SlaveNode}).
+
+get_gm(CPid) ->
+ gen_server2:call(CPid, get_gm, infinity).
+
+%% ---------------------------------------------------------------------------
+%% gen_server
+%% ---------------------------------------------------------------------------
+
+init([#amqqueue { name = QueueName } = Q, GM]) ->
+ GM1 = case GM of
+ undefined ->
+ ok = gm:create_tables(),
+ {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM2, _Members} ->
+ ok
+ end,
+ GM2;
+ _ ->
+ true = link(GM),
+ GM
+ end,
+ {ok, _TRef} =
+ timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ {ok, #state { q = Q, gm = GM1 }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(get_gm, _From, State = #state { gm = GM }) ->
+ reply(GM, State).
+
+handle_cast({add_slave, Node}, State = #state { q = Q }) ->
+ Nodes = nodes(),
+ case lists:member(Node, Nodes) of
+ true ->
+ Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]),
+ rabbit_log:info("Adding slave node for ~s: ~p~n",
+ [rabbit_misc:rs(Q #amqqueue.name), Result]);
+ false ->
+ rabbit_log:info(
+ "Ignoring request to add slave on node ~p for ~s~n",
+ [Node, rabbit_misc:rs(Q #amqqueue.name)])
+ end,
+ noreply(State);
+
+handle_cast({gm_deaths, Deaths},
+ State = #state { q = #amqqueue { name = QueueName } }) ->
+ rabbit_log:info("Master ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node() ->
+ noreply(State);
+ {error, not_found} ->
+ {stop, normal, State}
+ end.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, #state{}) ->
+ %% gen_server case
+ ok;
+terminate([_CPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([CPid], Members) ->
+ CPid ! {joined, self(), Members},
+ ok.
+
+members_changed([_CPid], _Births, []) ->
+ ok;
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
+
+handle_msg([_CPid], _From, heartbeat) ->
+ ok;
+handle_msg([_CPid], _From, _Msg) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
new file mode 100644
index 0000000000..11831a2998
--- /dev/null
+++ b/src/rabbit_mirror_queue_master.erl
@@ -0,0 +1,250 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_master).
+
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1, invoke/3]).
+
+-export([start/1, stop/0]).
+
+-export([promote_backing_queue_state/5]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(state, { gm,
+ coordinator,
+ backing_queue,
+ backing_queue_state,
+ set_delivered,
+ seen
+ }).
+
+%% ---------------------------------------------------------------------------
+%% Backing queue
+%% ---------------------------------------------------------------------------
+
+start(_DurableQueues) ->
+ %% This will never get called as this module will never be
+ %% installed as the default BQ implementation.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+stop() ->
+ %% Same as start/1.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+init(#amqqueue { arguments = Args } = Q, Recover) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
+ Nodes1 = case Nodes of
+ [] -> nodes();
+ _ -> [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes]
+ end,
+ [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1],
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover),
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = 0,
+ seen = sets:new() }.
+
+promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) ->
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen = Seen }.
+
+terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination. The queue is going down but
+ %% shouldn't be deleted. Most likely safe shutdown of this
+ %% node. Thus just let some other slave take over.
+ State #state { backing_queue_state = BQ:terminate(BQS) }.
+
+delete_and_terminate(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, delete_and_terminate),
+ State #state { backing_queue_state = BQ:delete_and_terminate(BQS),
+ set_delivered = 0 }.
+
+purge(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_length, 0}),
+ {Count, BQS1} = BQ:purge(BQS),
+ {Count, State #state { backing_queue_state = BQS1,
+ set_delivered = 0 }}.
+
+publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }
+ end.
+
+publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps,
+ ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
+ MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg,
+ MsgProps, ChPid, BQS),
+ {AckTag, State #state { backing_queue_state = BQS1 }}
+ end.
+
+dropwhile(Fun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered }) ->
+ Len = BQ:len(BQS),
+ BQS1 = BQ:dropwhile(Fun, BQS),
+ Dropped = Len - BQ:len(BQS1),
+ SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
+ ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }.
+
+fetch(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered,
+ seen = Seen }) ->
+ {Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {#basic_message { guid = Guid } = Message, IsDelivered, AckTag,
+ Remaining} ->
+ ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ Seen1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to empty
+ _ -> Seen
+ end,
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State1 #state { set_delivered = SetDelivered1,
+ seen = Seen1 }}
+ end.
+
+ack(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:ack(AckTags, BQS),
+ case Guids of
+ [] -> ok;
+ _ -> ok = gm:broadcast(GM, {ack, Guids})
+ end,
+ {Guids, State #state { backing_queue_state = BQS1 }}.
+
+tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid})
+ State.
+
+tx_ack(Txn, AckTags, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_ack, Txn, Guids})
+ State.
+
+tx_rollback(Txn, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_rollback, Txn})
+ {[], State}.
+
+tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) ->
+ %% Maybe don't want to transmit the MsgPropsFun but what choice do
+ %% we have? OTOH, on the slaves, things won't be expiring on their
+ %% own (props are interpreted by amqqueue, not vq), so if the msg
+ %% props aren't quite the same, that doesn't matter.
+ %%
+ %% The PostCommitFun is actually worse - we need to prevent that
+ %% from being invoked until we have confirmation from all the
+ %% slaves that they've done everything up to there.
+ %%
+ %% In fact, transactions are going to need work seeing as it's at
+ %% this point that VQ mentions amqqueue, which will thus not work
+ %% on the slaves - we need to make sure that all the slaves do the
+ %% tx_commit_post_msg_store at the same point, and then when they
+ %% all confirm that (scatter/gather), we can finally invoke the
+ %% PostCommitFun.
+ %%
+ %% Another idea is that the slaves are actually driven with
+ %% pubacks and thus only the master needs to support txns
+ %% directly.
+ {[], State}.
+
+requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgPropsFun, Guids}),
+ {Guids, State #state { backing_queue_state = BQS1 }}.
+
+len(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:len(BQS).
+
+is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:is_empty(BQS).
+
+set_ram_duration_target(Target, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state =
+ BQ:set_ram_duration_target(Target, BQS) }.
+
+ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Result, BQS1} = BQ:ram_duration(BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
+needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:needs_idle_timeout(BQS).
+
+idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:idle_timeout(BQS).
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+
+status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:status(BQS).
+
+invoke(?MODULE, Fun, State) ->
+ Fun(State);
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ {Guids, State #state { backing_queue_state = BQS1 }}.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
new file mode 100644
index 0000000000..090cb81203
--- /dev/null
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -0,0 +1,46 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_misc).
+
+-export([remove_from_queue/2]).
+
+-include("rabbit.hrl").
+
+remove_from_queue(QueueName, DeadPids) ->
+ DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ %% Someone else could have deleted the queue before we
+ %% get here.
+ case mnesia:read({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [Q = #amqqueue { pid = QPid,
+ mirror_pids = MPids }] ->
+ [QPid1 | MPids1] =
+ [Pid || Pid <- [QPid | MPids],
+ not lists:member(node(Pid), DeadNodes)],
+ case {{QPid, MPids}, {QPid1, MPids1}} of
+ {Same, Same} ->
+ {ok, QPid};
+ _ ->
+ Q1 = Q #amqqueue { pid = QPid1,
+ mirror_pids = MPids1 },
+ ok = rabbit_amqqueue:store_queue(Q1),
+ {ok, QPid1}
+ end
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
new file mode 100644
index 0000000000..4f9d2066be
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -0,0 +1,529 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave).
+
+%% We join the GM group before we add ourselves to the amqqueue
+%% record. As a result:
+%% 1. We can receive msgs from GM that correspond to messages we will
+%% never receive from publishers.
+%% 2. When we receive a message from publishers, we must receive a
+%% message from the GM group for it.
+%% 3. However, that instruction from the GM group can arrive either
+%% before or after the actual message. We need to be able to
+%% distinguish between GM instructions arriving early, and case (1)
+%% above.
+%%
+%% All instructions from the GM group must be processed in the order
+%% in which they're received.
+%%
+%% Thus, we need a queue per sender, and a queue for GM instructions.
+%%
+%% On receipt of a GM group instruction, three things are possible:
+%% 1. The queue of publisher messages is empty. Thus store the GM
+%% instruction to the instrQ.
+%% 2. The head of the queue of publisher messages has a message that
+%% matches the GUID of the GM instruction. Remove the message, and
+%% route appropriately.
+%% 3. The head of the queue of publisher messages has a message that
+%% does not match the GUID of the GM instruction. Throw away the GM
+%% instruction: the GM instruction must correspond to a message
+%% that we'll never receive. If it did not, then before the current
+%% instruction, we would have received an instruction for the
+%% message at the head of this queue, thus the head of the queue
+%% would have been removed and processed.
+%%
+%% On receipt of a publisher message, three things are possible:
+%% 1. The queue of GM group instructions is empty. Add the message to
+%% the relevant queue and await instructions from the GM.
+%% 2. The head of the queue of GM group instructions has an
+%% instruction matching the GUID of the message. Remove that
+%% instruction and act on it. Attempt to process the rest of the
+%% instrQ.
+%% 3. The head of the queue of GM group instructions has an
+%% instruction that does not match the GUID of the message. If the
+%% message is from the same publisher as is referred to by the
+%% instruction then throw away the GM group instruction and repeat
+%% - attempt to match against the next instruction if there is one:
+%% The instruction thrown away was for a message we'll never
+%% receive.
+%%
+%% In all cases, we are relying heavily on order preserving messaging
+%% both from the GM group and from the publishers.
+
+-export([start_link/1, set_maximum_since_use/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, handle_pre_hibernate/1]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm,
+ master_node,
+ backing_queue,
+ backing_queue_state,
+ rate_timer_ref,
+
+ sender_queues, %% :: Pid -> MsgQ
+ guid_ack, %% :: Guid -> AckTag
+ seen, %% Set Guid
+
+ guid_to_channel %% for confirms
+ }).
+
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+
+init([#amqqueue { name = QueueName } = Q]) ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ ok = gm:create_tables(),
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ Self = self(),
+ Node = node(),
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { mirror_pids = MPids1 },
+ write),
+ {ok, QPid};
+ _ ->
+ {error, node_already_present}
+ end
+ end) of
+ {ok, MPid} ->
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target,
+ [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_node = node(MPid),
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ guid_ack = dict:new(),
+ seen = sets:new(),
+
+ guid_to_channel = dict:new()
+ }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ {error, Error} ->
+ {stop, Error}
+ end.
+
+handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "immediate" delivery mode
+ gen_server2:reply(From, false), %% master may deliver it, not us
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({deliver, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode
+ gen_server2:reply(From, true), %% amqqueue throws away the result anyway
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({gm_deaths, Deaths}, From,
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_node = MNode }) ->
+ rabbit_log:info("Slave ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= MNode ->
+ reply(ok, State);
+ {ok, Pid} when node(Pid) =:= node() ->
+ promote_me(From, State);
+ {ok, Pid} ->
+ gen_server2:reply(From, ok),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_node = node(Pid) });
+ {error, not_found} ->
+ gen_server2:reply(From, ok),
+ {stop, normal, State}
+ end;
+
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
+
+
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
+
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
+
+handle_cast({deliver, Delivery = #delivery {}}, State) ->
+ %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(update_ram_duration,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 }).
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate(_Reason, #state { backing_queue_state = undefined }) ->
+ %% We've received a delete_and_terminate from gm, thus nothing to
+ %% do here.
+ ok;
+terminate(Reason, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef }) ->
+ ok = gm:leave(GM),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, BQ, BQS, RateTRef, [], []),
+ rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate([_SPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% mainly copied from amqqueue_process
+ BQS1 = BQ:handle_pre_hibernate(BQS),
+ %% no activity for a while == 0 egress and ingress rates
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS2 })}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([SPid], _Members) ->
+ SPid ! {joined, self()},
+ ok.
+
+members_changed([_SPid], _Births, []) ->
+ ok;
+members_changed([SPid], _Births, Deaths) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {stop, normal} end,
+ fun () ->
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok ->
+ ok;
+ {promote, CPid} ->
+ {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end
+ end).
+
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+maybe_run_queue_via_backing_queue(
+ Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_to_channel = GTC }) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids),
+ State #state { backing_queue_state = BQS1,
+ guid_to_channel = GTC1 }.
+
+record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) ->
+ GTC;
+record_confirm_or_confirm(
+ #delivery { sender = ChPid,
+ message = #basic_message { is_persistent = true,
+ guid = Guid },
+ msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) ->
+ dict:store(Guid, {ChPid, MsgSeqNo}, GTC);
+record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo },
+ _Q, GTC) ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ GTC.
+
+maybe_confirm_message(Guid, GTC) ->
+ case dict:find(Guid, GTC) of
+ {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ dict:erase(Guid, GTC);
+ error ->
+ GTC
+ end.
+
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
+
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_ack = GA }) ->
+ rabbit_log:info("Promoting slave ~p for ~s~n",
+ [self(), rabbit_misc:rs(Q #amqqueue.name)]),
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
+ true = unlink(GM),
+ gen_server2:reply(From, {promote, CPid}),
+ ok = gm:confirmed_broadcast(GM, heartbeat),
+ MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
+ CPid, BQ, BQS, GM, Seen),
+ %% We have to do the requeue via this init because otherwise we
+ %% don't have access to the relevent MsgPropsFun. Also, we are
+ %% already in mnesia as the master queue pid. Thus we cannot just
+ %% publish stuff by sending it to ourself - we must pass it
+ %% through to this init, otherwise we can violate ordering
+ %% constraints.
+ AckTags = [AckTag || {_Guid, AckTag} <- dict:to_list(GA)],
+ Deliveries = lists:append([queue:to_list(PubQ)
+ || {_ChPid, PubQ} <- dict:to_list(SQ)]),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, rabbit_mirror_queue_master, MasterState, RateTRef,
+ AckTags, Deliveries),
+ {become, rabbit_amqqueue_process, QueueState, hibernate}.
+
+noreply(State) ->
+ {noreply, next_state(State), hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, next_state(State), hibernate}.
+
+next_state(State) ->
+ ensure_rate_timer(State).
+
+%% copied+pasted from amqqueue_process
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
+
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { guid = Guid },
+ sender = ChPid },
+ State = #state { q = Q,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ case sets:is_element(Guid, Seen) of
+ true ->
+ GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
+ State #state { guid_to_channel = GTC1,
+ seen = sets:del_element(Guid, Seen) };
+ false ->
+ MQ = case dict:find(ChPid, SQ) of
+ {ok, MQ1} -> MQ1;
+ error -> queue:new()
+ end,
+ SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ State #state { sender_queues = SQ1 }
+ end.
+
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }},
+ State = #state { q = Q,
+ sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ {SQ1, Seen1, GTC1} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {{value, Delivery = #delivery {
+ message = #basic_message { guid = Guid } }},
+ MQ1} ->
+ GTC2 = record_confirm_or_confirm(Delivery, Q, GTC),
+ {dict:store(ChPid, MQ1, SQ), Seen, GTC2};
+ {{value, #delivery {}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% amqqueue record. We'll never receive the
+ %% message directly.
+ {SQ, Seen, GTC}
+ end
+ end,
+ State1 = State #state { sender_queues = SQ1,
+ seen = Seen1,
+ guid_to_channel = GTC1 },
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State1 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {GA1, GTC3} = case AckRequired of
+ true -> {dict:store(Guid, AckTag, GA), GTC1};
+ false -> {GA, maybe_confirm_message(Guid, GTC1)}
+ end,
+ State1 #state { backing_queue_state = BQS1,
+ guid_ack = GA1,
+ guid_to_channel = GTC3 }
+ end};
+process_instruction({set_length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ ToDrop = QLen - Length,
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
+process_instruction({fetch, AckRequired, Guid, Remaining},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ QLen = BQ:len(BQS),
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ GA1 = case AckRequired of
+ true -> dict:store(Guid, AckTag, GA);
+ false -> GA
+ end,
+ State #state { backing_queue_state = BQS1,
+ guid_ack = GA1 };
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
+process_instruction({ack, Guids},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ {AckTags, GA1} = guids_to_acktags(Guids, GA),
+ {Guids1, BQS1} = BQ:ack(AckTags, BQS),
+ [] = Guids1 -- Guids, %% ASSERTION
+ {ok, State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, Guids},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ {AckTags, GA1} = guids_to_acktags(Guids, GA),
+ {ok, case length(AckTags) =:= length(Guids) of
+ true ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% the only thing we can safely do is nuke out our BQ
+ %% and GA
+ {_Count, BQS1} = BQ:purge(BQS),
+ {Guids, BQS2} = ack_all(BQ, GA, BQS1),
+ State #state { guid_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
+process_instruction(delete_and_terminate,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:delete_and_terminate(BQS),
+ {stop, State #state { backing_queue_state = undefined }}.
+
+guids_to_acktags(Guids, GA) ->
+ {AckTags, GA1} =
+ lists:foldl(fun (Guid, {AckTagsN, GAN}) ->
+ case dict:find(Guid, GA) of
+ error -> {AckTagsN, GAN};
+ {ok, AckTag} -> {[AckTag | AckTagsN],
+ dict:erase(Guid, GAN)}
+ end
+ end, {[], GA}, Guids),
+ {lists:reverse(AckTags), GA1}.
+
+ack_all(BQ, GA, BQS) ->
+ BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS).
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
new file mode 100644
index 0000000000..80c0520c08
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -0,0 +1,54 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave_sup).
+
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, queue_sup_queue_recovery},
+ {enables, routing_ready}]}).
+
+-behaviour(supervisor2).
+
+-export([start/0, start_link/0, start_child/2]).
+
+-export([init/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+start() ->
+ {ok, _} =
+ supervisor:start_child(
+ rabbit_sup,
+ {rabbit_mirror_queue_slave_sup,
+ {rabbit_mirror_queue_slave_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
+ ok.
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_child(Node, Args) ->
+ supervisor2:start_child({?SERVER, Node}, Args).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 10, 10},
+ [{rabbit_mirror_queue_slave,
+ {rabbit_mirror_queue_slave, start_link, []},
+ temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 03317d7050..abc27c5f7d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -55,6 +55,8 @@
-export([now_ms/0]).
-export([lock_file/1]).
-export([const_ok/1, const/1]).
+-export([ntoa/1, ntoab/1]).
+-export([is_process_alive/1]).
%%----------------------------------------------------------------------------
@@ -191,6 +193,9 @@
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/1 :: (any()) -> 'ok').
-spec(const/1 :: (A) -> const(A)).
+-spec(ntoa/1 :: (inet:ip_address()) -> string()).
+-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-endif.
@@ -237,11 +242,20 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
{Same, Same} -> ok;
{Orig1, New1} -> protocol_error(
precondition_failed,
- "inequivalent arg '~s' for ~s: "
- "required ~w, received ~w",
- [Key, rabbit_misc:rs(Name), New1, Orig1])
+ "inequivalent arg '~s' for ~s: "
+ "received ~s but current is ~s",
+ [Key, rs(Name), val(New1), val(Orig1)])
end.
+val(undefined) ->
+ "none";
+val({Type, Value}) ->
+ Fmt = case is_binary(Value) of
+ true -> "the value '~s' of type '~s'";
+ false -> "the value '~w' of type '~s'"
+ end,
+ lists:flatten(io_lib:format(Fmt, [Value, Type])).
+
dirty_read(ReadSpec) ->
case mnesia:dirty_read(ReadSpec) of
[Result] -> {ok, Result};
@@ -338,8 +352,11 @@ throw_on_error(E, Thunk) ->
with_exit_handler(Handler, Thunk) ->
try
Thunk()
- catch exit:{R, _} when R =:= noproc; R =:= nodedown;
- R =:= normal; R =:= shutdown ->
+ catch
+ exit:{R, _} when R =:= noproc; R =:= nodedown;
+ R =:= normal; R =:= shutdown ->
+ Handler();
+ exit:{{R, _}, _} when R =:= nodedown; R =:= shutdown ->
Handler()
end.
@@ -832,3 +849,26 @@ lock_file(Path) ->
const_ok(_) -> ok.
const(X) -> fun (_) -> X end.
+
+%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see
+%% when IPv6 is enabled but not used (i.e. 99% of the time).
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+ inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256});
+ntoa(IP) ->
+ inet_parse:ntoa(IP).
+
+ntoab(IP) ->
+ Str = ntoa(IP),
+ case string:str(Str, ":") of
+ 0 -> Str;
+ _ -> "[" ++ Str ++ "]"
+ end.
+
+is_process_alive(Pid) when node(Pid) =:= node() ->
+ erlang:is_process_alive(Pid);
+is_process_alive(Pid) ->
+ case rpc:call(node(Pid), erlang, is_process_alive, [Pid]) of
+ true -> true;
+ _ -> false
+ end.
+
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a9b4e17745..1ad65759d4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -204,7 +204,8 @@ table_definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e0706..e9c356e1f6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, Guid, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State1)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, not_found}, _Guid, State) ->
+ {ignore, undefined, State};
+write_action({true, #msg_location { file = File }}, _Guid, State) ->
+ {ignore, File, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, File, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
@@ -943,11 +965,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) ->
gb_sets:singleton(Guid), CTG)
end, CRef, State).
-client_confirm_if_on_disk(CRef, Guid, CurFile,
- State = #msstate { current_file = CurFile }) ->
- record_pending_confirm(CRef, Guid, State);
-client_confirm_if_on_disk(CRef, Guid, _File, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
- end, CRef, State).
-
client_confirm(CRef, Guids, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 7c07c4fedb..ebd7fe8a06 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -336,6 +336,8 @@ get_node_tcp_listener() ->
case application:get_env(rabbit, tcp_listeners) of
{ok, [{_IpAddy, _Port} = Listener]} ->
Listener;
+ {ok, [Port]} when is_number(Port) ->
+ {"0.0.0.0", Port};
{ok, []} ->
undefined;
{ok, Other} ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9788c922bf..36f61628b8 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -16,15 +16,15 @@
-module(rabbit_networking).
--export([boot/0, start/0, start_tcp_listener/2, start_ssl_listener/3,
- stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+-export([boot/0, start/0, start_tcp_listener/1, start_ssl_listener/2,
+ stop_tcp_listener/1, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
close_connection/2]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/3]).
+-export([check_tcp_listener_address/2]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
start_client/1, start_ssl_client/2]).
@@ -32,29 +32,26 @@
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
--define(RABBIT_TCP_OPTS, [
- binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- {backlog, 128}, % use the maximum listen(2) backlog value
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}
- ]).
-
-define(SSL_TIMEOUT, 5). %% seconds
+-define(FIRST_TEST_BIND_PORT, 10000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([ip_port/0, hostname/0]).
+-type(family() :: atom()).
+-type(listener_config() :: ip_port() |
+ {hostname(), ip_port()} |
+ {hostname(), ip_port(), family()}).
+
-spec(start/0 :: () -> 'ok').
--spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
--spec(start_ssl_listener/3 :: (hostname(), ip_port(), rabbit_types:infos())
- -> 'ok').
--spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
+-spec(start_ssl_listener/2 ::
+ (listener_config(), rabbit_types:infos()) -> 'ok').
+-spec(stop_tcp_listener/1 :: (listener_config()) -> 'ok').
-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
-spec(connections/0 :: () -> [rabbit_types:connection()]).
@@ -69,8 +66,8 @@
(rabbit_types:info_keys()) -> [rabbit_types:infos()]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/3 ::
- (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}).
+-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
+ -> [{inet:ip_address(), ip_port(), family(), atom()}]).
-endif.
@@ -83,7 +80,7 @@ boot() ->
boot_tcp() ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
- [ok = start_tcp_listener(Host, Port) || {Host, Port} <- TcpListeners],
+ [ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
ok.
boot_ssl() ->
@@ -103,75 +100,111 @@ boot_ssl() ->
end}
| SslOptsConfig]
end,
- [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners],
ok
end.
start() ->
- {ok,_} = supervisor:start_child(
+ {ok,_} = supervisor2:start_child(
rabbit_sup,
{rabbit_tcp_client_sup,
- {tcp_client_sup, start_link,
+ {rabbit_client_sup, start_link,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]},
- transient, infinity, supervisor, [tcp_client_sup]}),
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-getaddr(Host) ->
- %% inet_parse:address takes care of ip string, like "0.0.0.0"
- %% inet:getaddr returns immediately for ip tuple {0,0,0,0},
- %% and runs 'inet_gethost' port process for dns lookups.
- %% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+
+getaddr(Host, Family) ->
case inet_parse:address(Host) of
- {ok, IPAddress1} -> IPAddress1;
- {error, _} ->
- case inet:getaddr(Host, inet) of
- {ok, IPAddress2} -> IPAddress2;
- {error, Reason} ->
- error_logger:error_msg("invalid host ~p - ~p~n",
- [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}})
- end
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
end.
-check_tcp_listener_address(NamePrefix, Host, Port) ->
- IPAddress = getaddr(Host),
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
+check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
+ check_tcp_listener_address_auto(NamePrefix, Port);
+
+check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+ %% Variant to prevent lots of hacking around in bash and batch files
+ check_tcp_listener_address_auto(NamePrefix, Port);
+
+check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ %% auto: determine family IPv4 / IPv6 after converting to IP address
+ check_tcp_listener_address(NamePrefix, {Host, Port, auto});
+
+check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
throw({error, {invalid_port, Port}})
end,
- Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
- {IPAddress, Name}.
+ [{IPAddress, Port, Family,
+ rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)].
+
+check_tcp_listener_address_auto(NamePrefix, Port) ->
+ lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ Listener <- port_to_listeners(Port)]).
-start_tcp_listener(Host, Port) ->
- start_listener(Host, Port, amqp, "TCP Listener",
+start_tcp_listener(Listener) ->
+ start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
-start_ssl_listener(Host, Port, SslOpts) ->
- start_listener(Host, Port, 'amqp/ssl', "SSL Listener",
+start_ssl_listener(Listener, SslOpts) ->
+ start_listener(Listener, 'amqp/ssl', "SSL Listener",
{?MODULE, start_ssl_client, [SslOpts]}).
-start_listener(Host, Port, Protocol, Label, OnConnect) ->
- {IPAddress, Name} =
- check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
+start_listener(Listener, Protocol, Label, OnConnect) ->
+ [start_listener0(Spec, Protocol, Label, OnConnect) ||
+ Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ ok.
+
+start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port, ?RABBIT_TCP_OPTS ,
+ [IPAddress, Port, [Family | tcp_opts()],
{?MODULE, tcp_listener_started, [Protocol]},
{?MODULE, tcp_listener_stopped, [Protocol]},
OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}),
+ transient, infinity, supervisor, [tcp_listener_sup]}).
+
+stop_tcp_listener(Listener) ->
+ [stop_tcp_listener0(Spec) ||
+ Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
ok.
-stop_tcp_listener(Host, Port) ->
- IPAddress = getaddr(Host),
+stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
- ok = supervisor:delete_child(rabbit_sup, Name),
- ok.
+ ok = supervisor:delete_child(rabbit_sup, Name).
tcp_listener_started(Protocol, IPAddress, Port) ->
%% We need the ip to distinguish e.g. 0.0.0.0 and 127.0.0.1
@@ -252,15 +285,106 @@ close_connection(Pid, Explanation) ->
%%--------------------------------------------------------------------
tcp_host({0,0,0,0}) ->
- {ok, Hostname} = inet:gethostname(),
- case inet:gethostbyname(Hostname) of
- {ok, #hostent{h_name = Name}} -> Name;
- {error, _Reason} -> Hostname
- end;
+ hostname();
+
+tcp_host({0,0,0,0,0,0,0,0}) ->
+ hostname();
+
tcp_host(IPAddress) ->
case inet:gethostbyaddr(IPAddress) of
{ok, #hostent{h_name = Name}} -> Name;
- {error, _Reason} -> inet_parse:ntoa(IPAddress)
+ {error, _Reason} -> rabbit_misc:ntoa(IPAddress)
+ end.
+
+hostname() ->
+ {ok, Hostname} = inet:gethostname(),
+ case inet:gethostbyname(Hostname) of
+ {ok, #hostent{h_name = Name}} -> Name;
+ {error, _Reason} -> Hostname
end.
cmap(F) -> rabbit_misc:filter_exit_map(F, connections()).
+
+tcp_opts() ->
+ {ok, Opts} = application:get_env(rabbit, tcp_listen_options),
+ Opts.
+
+%%--------------------------------------------------------------------
+
+%% There are three kinds of machine (for our purposes).
+%%
+%% * Those which treat IPv4 addresses as a special kind of IPv6 address
+%% ("Single stack")
+%% - Linux by default, Windows Vista and later
+%% - We also treat any (hypothetical?) IPv6-only machine the same way
+%% * Those which consider IPv6 and IPv4 to be completely separate things
+%% ("Dual stack")
+%% - OpenBSD, Windows XP / 2003, Linux if so configured
+%% * Those which do not support IPv6.
+%% - Ancient/weird OSes, Linux if so configured
+%%
+%% How to reconfigure Linux to test this:
+%% Single stack (default):
+%% echo 0 > /proc/sys/net/ipv6/bindv6only
+%% Dual stack:
+%% echo 1 > /proc/sys/net/ipv6/bindv6only
+%% IPv4 only:
+%% add ipv6.disable=1 to GRUB_CMDLINE_LINUX_DEFAULT in /etc/default/grub then
+%% sudo update-grub && sudo reboot
+%%
+%% This matters in (and only in) the case where the sysadmin (or the
+%% app descriptor) has only supplied a port and we wish to bind to
+%% "all addresses". This means different things depending on whether
+%% we're single or dual stack. On single stack binding to "::"
+%% implicitly includes all IPv4 addresses, and subsequently attempting
+%% to bind to "0.0.0.0" will fail. On dual stack, binding to "::" will
+%% only bind to IPv6 addresses, and we need another listener bound to
+%% "0.0.0.0" for IPv4. Finally, on IPv4-only systems we of course only
+%% want to bind to "0.0.0.0".
+%%
+%% Unfortunately it seems there is no way to detect single vs dual stack
+%% apart from attempting to bind to the port.
+port_to_listeners(Port) ->
+ IPv4 = {"0.0.0.0", Port, inet},
+ IPv6 = {"::", Port, inet6},
+ case ipv6_status(?FIRST_TEST_BIND_PORT) of
+ single_stack -> [IPv6];
+ ipv6_only -> [IPv6];
+ dual_stack -> [IPv6, IPv4];
+ ipv4_only -> [IPv4]
+ end.
+
+ipv6_status(TestPort) ->
+ IPv4 = [inet, {ip, {0,0,0,0}}],
+ IPv6 = [inet6, {ip, {0,0,0,0,0,0,0,0}}],
+ case gen_tcp:listen(TestPort, IPv6) of
+ {ok, LSock6} ->
+ case gen_tcp:listen(TestPort, IPv4) of
+ {ok, LSock4} ->
+ %% Dual stack
+ gen_tcp:close(LSock6),
+ gen_tcp:close(LSock4),
+ dual_stack;
+ %% Checking the error here would only let us
+ %% distinguish single stack IPv6 / IPv4 vs IPv6 only,
+ %% which we figure out below anyway.
+ {error, _} ->
+ gen_tcp:close(LSock6),
+ case gen_tcp:listen(TestPort, IPv4) of
+ %% Single stack
+ {ok, LSock4} -> gen_tcp:close(LSock4),
+ single_stack;
+ %% IPv6-only machine. Welcome to the future.
+ {error, eafnosupport} -> ipv6_only;
+ %% Dual stack machine with something already
+ %% on IPv4.
+ {error, _} -> ipv6_status(TestPort + 1)
+ end
+ end;
+ {error, eafnosupport} ->
+ %% IPv4-only machine. Welcome to the 90s.
+ ipv4_only;
+ {error, _} ->
+ %% Port in use
+ ipv6_status(TestPort + 1)
+ end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e4bc1cdc5a..817abaa2bd 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,14 +22,41 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([notify_cluster/0, rabbit_running_on/1]).
-define(SERVER, ?MODULE).
+-define(RABBIT_UP_RPC_TIMEOUT, 2000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(notify_cluster/0 :: () -> 'ok').
+
+-endif.
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+rabbit_running_on(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+
+notify_cluster() ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this rabbit
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
+ [Node], ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ %% register other active rabbits with this rabbit
+ [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ ok.
+
%%--------------------------------------------------------------------
init([]) ->
@@ -39,19 +66,20 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast({rabbit_running_on, Node}, State) ->
+ rabbit_log:info("node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({nodeup, Node}, State) ->
- rabbit_log:info("node ~p up", [Node]),
- {noreply, State};
handle_info({nodedown, Node}, State) ->
- rabbit_log:info("node ~p down", [Node]),
- %% TODO: This may turn out to be a performance hog when there are
- %% lots of nodes. We really only need to execute this code on
- %% *one* node, rather than all of them.
- ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node),
+ rabbit_log:info("node ~p down~n", [Node]),
+ ok = handle_dead_rabbit(Node),
+ {noreply, State};
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
+ rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+ ok = handle_dead_rabbit(Node),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -64,3 +92,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
+%% TODO: This may turn out to be a performance hog when there are
+%% lots of nodes. We really only need to execute this code on
+%% *one* node, rather than all of them.
+handle_dead_rabbit(Node) ->
+ ok = rabbit_networking:on_node_down(Node),
+ ok = rabbit_amqqueue:on_node_down(Node).
+
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 696dc26588..1781469a13 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -72,7 +72,13 @@
%% pre-init:
%% receive protocol header -> send connection.start, *starting*
%% starting:
-%% receive connection.start_ok -> send connection.tune, *tuning*
+%% receive connection.start_ok -> *securing*
+%% securing:
+%% check authentication credentials
+%% if authentication success -> send connection.tune, *tuning*
+%% if more challenge needed -> send connection.secure,
+%% receive connection.secure_ok *securing*
+%% otherwise send close, *exit*
%% tuning:
%% receive connection.tune_ok -> start heartbeats, *opening*
%% opening:
@@ -256,7 +262,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
{PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = inet_parse:ntoa(PeerAddress),
+ PeerAddressS = rabbit_misc:ntoab(PeerAddress),
rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
ClientSock = socket_op(Sock, SockTransform),
@@ -351,7 +357,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
throw({handshake_timeout, State#v1.callback})
end;
timeout ->
- throw({timeout, State#v1.connection_state});
+ case State#v1.connection_state of
+ closed -> mainloop(Deb, State);
+ S -> throw({timeout, S})
+ end;
{'$gen_call', From, {shutdown, Explanation}} ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
@@ -916,10 +925,14 @@ socket_info(Get, Select) ->
end.
ssl_info(F, Sock) ->
+ %% The first ok form is R14
+ %% The second is R13 - the extra term is exportability (by inspection,
+ %% the docs are wrong)
case rabbit_net:ssl_info(Sock) of
- nossl -> '';
- {error, _} -> '';
- {ok, Info} -> F(Info)
+ nossl -> '';
+ {error, _} -> '';
+ {ok, {P, {K, C, H}}} -> F({P, {K, C, H}});
+ {ok, {P, {K, C, H, _}}} -> F({P, {K, C, H}})
end.
cert_info(F, Sock) ->
@@ -940,8 +953,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {Protocol, Sock, Channel, FrameMax,
- self(), User, VHost, Collector}),
+ ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
+ VHost, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 692d2473b8..309e0e6ed9 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -102,7 +102,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid}] -> [QPid | QPids];
- [] -> QPids
+ [#amqqueue{pid = QPid, mirror_pids = MPids}] ->
+ MPids ++ [QPid | QPids];
+ [] ->
+ QPids
end
end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b0950832..e895b0edb2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -26,6 +26,7 @@
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
@@ -80,20 +81,24 @@ run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ passed = test_queue_cleanup(SecondaryNode),
+ passed = test_declare_on_dead_queue(SecondaryNode),
%% we now run the tests remotely, so that code coverage on the
%% local node picks up more of the delegate
Node = node(),
Self = self(),
Remote = spawn(SecondaryNode,
- fun () -> A = test_delegates_async(Node),
- B = test_delegates_sync(Node),
- Self ! {self(), {A, B}}
+ fun () -> Rs = [ test_delegates_async(Node),
+ test_delegates_sync(Node),
+ test_queue_cleanup(Node),
+ test_declare_on_dead_queue(Node) ],
+ Self ! {self(), Rs}
end),
receive
{Remote, Result} ->
- Result = {passed, passed}
- after 2000 ->
+ Result = lists:duplicate(length(Result), passed)
+ after 30000 ->
throw(timeout)
end,
@@ -1278,6 +1283,61 @@ test_delegates_sync(SecondaryNode) ->
passed.
+test_queue_cleanup_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ Pid ! Method,
+ test_queue_cleanup_receiver(Pid)
+ end.
+
+
+test_queue_cleanup(_SecondaryNode) ->
+ {_Writer, Ch} = test_spawn(fun test_queue_cleanup_receiver/1),
+ rabbit_channel:do(Ch, #'queue.declare'{ queue = ?CLEANUP_QUEUE_NAME }),
+ receive #'queue.declare_ok'{queue = ?CLEANUP_QUEUE_NAME} ->
+ ok
+ after 1000 -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ rabbit:stop(),
+ rabbit:start(),
+ rabbit_channel:do(Ch, #'queue.declare'{ passive = true,
+ queue = ?CLEANUP_QUEUE_NAME }),
+ receive
+ {channel_exit, 1, {amqp_error, not_found, _, _}} ->
+ ok
+ after 2000 ->
+ throw(failed_to_receive_channel_exit)
+ end,
+ passed.
+
+test_declare_on_dead_queue(SecondaryNode) ->
+ QueueName = rabbit_misc:r(<<"/">>, queue, ?CLEANUP_QUEUE_NAME),
+ Self = self(),
+ Pid = spawn(SecondaryNode,
+ fun () ->
+ {new, #amqqueue{name = QueueName, pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ exit(QPid, kill),
+ Self ! {self(), killed, QPid}
+ end),
+ receive
+ {Pid, killed, QPid} ->
+ {existing, #amqqueue{name = QueueName,
+ pid = QPid}} =
+ rabbit_amqqueue:declare(QueueName, false, false, [], none),
+ false = rabbit_misc:is_process_alive(QPid),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [],
+ none),
+ true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ {ok, 0} = rabbit_amqqueue:delete(Q, false, false),
+ passed
+ after 2000 ->
+ throw(failed_to_create_and_kill_queue)
+ end.
+
%---------------------------------------------------------------------
control_action(Command, Args) ->
@@ -1853,7 +1913,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1871,9 +1931,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ VQ = rabbit_variable_queue:init(test_amqqueue(true), false,
fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
@@ -1932,7 +1996,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -1976,7 +2040,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1986,7 +2050,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2020,7 +2084,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2049,7 +2113,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2063,10 +2127,11 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ {_Guids, VQ4} =
+ rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2074,7 +2139,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2098,7 +2163,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
+ VQ1 = rabbit_variable_queue:init(Q, true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f27..bde336c000 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -123,7 +123,8 @@
auto_delete :: boolean(),
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ pid :: rabbit_types:maybe(pid()),
+ mirror_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f39bc96426..74487ade05 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,18 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, invoke/3]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/4]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -281,12 +281,11 @@
-record(sync, { acks_persistent, acks_all, pubs, funs }).
%% When we discover, on publish, that we should write some indices to
-%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
-%% betas that we must be due to write indices for before we do any
-%% work at all. This is both a minimum and a maximum - we don't write
-%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
-%% write more - we can always come back on the next publish to do
-%% more.
+%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
+%% that we must be due to write indices for before we do any work at
+%% all. This is both a minimum and a maximum - we don't write fewer
+%% than IO_BATCH_SIZE indices out in one go, and we don't write more -
+%% we can always come back on the next publish to do more.
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -299,7 +298,7 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id() | 'blank_ack').
+-type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -394,15 +393,16 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
+init(Queue, Recover) ->
Self = self(),
- init(QueueName, IsDurable, Recover,
+ init(Queue, Recover,
fun (Guids, ActionTaken) ->
msgs_written_to_disk(Self, Guids, ActionTaken)
end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
case IsDurable of
@@ -412,7 +412,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName }, true,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -502,18 +503,19 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, #basic_message { guid = Guid },
- _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, _ChPid,
+ State = #vqstate { len = 0 }) ->
blind_confirm(self(), gb_sets:singleton(Guid)),
- {blank_ack, a(State)};
+ {undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
+ _ChPid,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -628,7 +630,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
MsgStatus #msg_status {
is_delivered = true }, State),
{SeqId, StateN};
- false -> {blank_ack, State}
+ false -> {undefined, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -643,13 +645,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State)).
+ {Guids, State1} = ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State),
+ {Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of
@@ -699,7 +702,7 @@ requeue(AckTags, MsgPropsFun, State) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- a(reduce_memory_use(
+ {Guids, State1} =
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
@@ -714,7 +717,8 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ {Guids, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -852,6 +856,9 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
+invoke(?MODULE, Fun, State) ->
+ Fun(State).
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -897,7 +904,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfimred set
+%% when requeueing, we re-add a guid to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
@@ -963,7 +970,7 @@ msg_store_close_fds_fun(IsPersistent) ->
Self = self(),
fun () ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
+ Self, ?MODULE,
fun (State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} =
msg_store_close_fds(MSCState, IsPersistent),
@@ -1109,10 +1116,11 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
+ Self, ?MODULE,
+ fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
+ end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> remove_persistent_messages(
@@ -1175,20 +1183,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ {_Guids, State1} = ack(Acks, State),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProps},
- {SeqIdsAcc, State2}) ->
+ {SeqIdsAcc, State3}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
- publish(Msg, MsgProps, false, IsPersistent1, State2),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ {SeqId, State4} =
+ publish(Msg, MsgProps, false, IsPersistent1, State3),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
+ end, {PAcks, State1}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+ State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1335,7 +1344,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore} =
+ {PersistentSeqIds, GuidsByStore, _AllGuids} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1354,9 +1363,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore, AllGuids},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1376,21 +1385,24 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ {lists:reverse(AllGuids),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new()}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
- {PersistentSeqIdsAcc, GuidsByStore};
+ index_on_disk = false,
+ guid = Guid },
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
+ [Guid | AllGuids]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1436,33 +1448,35 @@ msgs_confirmed(GuidSet, State) ->
blind_confirm(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+ QPid, ?MODULE, fun (State) -> msgs_confirmed(GuidSet, State) end).
msgs_written_to_disk(QPid, GuidSet, removed) ->
blind_confirm(QPid, GuidSet);
msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 18e2bdadb9..1a240856ce 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -359,8 +359,8 @@ handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
{noreply, NState};
handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) ->
case get_child(Child#child.name, State) of
- {value, Child} ->
- {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {value, Child1} ->
+ {ok, NState} = do_restart(RestartType, Reason, Child1, State),
{noreply, NState};
_ ->
{noreply, State}
@@ -539,7 +539,7 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
{ok, _TRef} = timer:apply_after(
trunc(Delay*1000), ?MODULE, delayed_restart,
[self(), {{RestartType, Delay}, Reason, Child}]),
- {ok, NState}
+ {ok, state_del_child(Child, NState)}
end;
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 194389e32a..0d50683db7 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -59,8 +59,8 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
{PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [inet_parse:ntoa(Address), Port,
- inet_parse:ntoa(PeerAddress), PeerPort]),
+ [rabbit_misc:ntoab(Address), Port,
+ rabbit_misc:ntoab(PeerAddress), PeerPort]),
%% In the event that somebody floods us with connections we can spew
%% the above message at error_logger faster than it can keep up.
%% So error_logger's mailbox grows unbounded until we eat all the
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index b1bfcafcdb..cd64696904 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -50,8 +50,9 @@ init({IPAddress, Port, SocketOpts,
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
- error_logger:info_msg("started ~s on ~s:~p~n",
- [Label, inet_parse:ntoa(LIPAddress), LPort]),
+ error_logger:info_msg(
+ "started ~s on ~s:~p~n",
+ [Label, rabbit_misc:ntoab(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
@@ -59,7 +60,7 @@ init({IPAddress, Port, SocketOpts,
{error, Reason} ->
error_logger:error_msg(
"failed to start ~s on ~s:~p - ~p~n",
- [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
+ [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
@@ -76,7 +77,7 @@ terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
error_logger:info_msg("stopped ~s on ~s:~p~n",
- [Label, inet_parse:ntoa(IPAddress), Port]),
+ [Label, rabbit_misc:ntoab(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 76be63d0cf..b4df1fd042 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -59,19 +59,21 @@ start_child() ->
ping_child(SupPid) ->
Ref = make_ref(),
- get_child_pid(SupPid) ! {ping, Ref, self()},
+ with_child_pid(SupPid, fun(ChildPid) -> ChildPid ! {ping, Ref, self()} end),
receive {pong, Ref} -> ok
after 1000 -> timeout
end.
exit_child(SupPid) ->
- true = exit(get_child_pid(SupPid), abnormal),
+ with_child_pid(SupPid, fun(ChildPid) -> exit(ChildPid, abnormal) end),
ok.
-get_child_pid(SupPid) ->
- [{_Id, ChildPid, worker, [test_sup]}] =
- supervisor2:which_children(SupPid),
- ChildPid.
+with_child_pid(SupPid, Fun) ->
+ case supervisor2:which_children(SupPid) of
+ [{_Id, undefined, worker, [test_sup]}] -> ok;
+ [{_Id, ChildPid, worker, [test_sup]}] -> Fun(ChildPid);
+ [] -> ok
+ end.
run_child() ->
receive {ping, Ref, Pid} -> Pid ! {pong, Ref},