summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_amqqueue.erl46
-rw-r--r--src/rabbit_amqqueue_process.erl200
-rw-r--r--src/rabbit_backing_queue.erl18
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl136
-rw-r--r--src/rabbit_mirror_queue_master.erl285
-rw-r--r--src/rabbit_mirror_queue_misc.erl46
-rw-r--r--src/rabbit_mirror_queue_slave.erl625
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl54
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl27
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl139
15 files changed, 1432 insertions, 175 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c9a929ae00..0548d6bf09 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -36,6 +36,12 @@
[]}},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_registry]}},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{requires, file_handle_cache},
@@ -55,13 +61,6 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_registry,
- [{description, "plugin registry"},
- {mfa, {rabbit_sup, start_child,
- [rabbit_registry]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3aa20821b9..36b1662e9a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,8 +18,8 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
+ maybe_run_queue_via_backing_queue/3,
+ maybe_run_queue_via_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
@@ -33,6 +33,7 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,10 +141,10 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -191,12 +192,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -438,11 +440,13 @@ internal_delete(QueueName) ->
end
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+maybe_run_queue_via_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun},
+ infinity).
+
+maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
@@ -465,7 +469,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end,
@@ -482,11 +487,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 54c92dc70d..eb3b13cc36 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,9 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-%% Queue's state
+-export([init_with_backing_queue_state/7]).
+
+% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -72,7 +74,8 @@
messages,
consumers,
memory,
- backing_queue_status
+ backing_queue_status,
+ mirror_pids
]).
-define(CREATION_EVENT_KEYS,
@@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,6 +117,36 @@ init(Q) ->
msg_id_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+ RateTRef, AckTags, Deliveries, MTC) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Owner of
+ none -> ok;
+ _ -> erlang:monitor(process, Owner)
+ end,
+ State = requeue_and_run(
+ AckTags,
+ process_args(
+ #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ msg_id_to_channel = MTC})),
+ lists:foldl(
+ fun (Delivery, StateN) ->
+ {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN),
+ StateN1
+ end, State, Deliveries).
+
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
@@ -137,8 +169,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -149,7 +180,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = BQ:init(Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -210,6 +241,13 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
+backing_queue_module(#amqqueue{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ _Nodes -> rabbit_mirror_queue_master
+ end.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
@@ -448,58 +486,78 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
case NeedsConfirming of
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
- DeliverFun =
- fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
- %% we don't need an expiry here because messages are
- %% not being enqueued, so we use an empty
- %% message_properties.
- {AckTag, BQS1} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= eventually)},
- BQS),
- {{Message, false, AckTag}, true,
- State1#q{backing_queue_state = BQS1}}
- end,
- {Delivered, State1} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
- {Delivered, NeedsConfirming, State1};
+ case BQ:validate_message(Message, BQS) of
+ {invalid, BQS1} ->
+ {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}};
+ {valid, BQS1} ->
+ PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
+ DeliverFun =
+ fun (AckRequired, false,
+ State1 = #q{backing_queue_state = BQS2}) ->
+ %% we don't need an expiry here because
+ %% messages are not being enqueued, so we use
+ %% an empty message_properties.
+ {AckTag, BQS3} =
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming =
+ (NeedsConfirming =:= eventually)},
+ ChPid, BQS2),
+ {{Message, false, AckTag}, true,
+ State1#q{backing_queue_state = BQS3}}
+ end,
+ {Delivered, State2} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ State#q{backing_queue_state = BQS1}),
+ {{valid, Delivered}, NeedsConfirming, State2}
+ end;
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
{NeedsConfirming, State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
- {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
+ case BQ:validate_message(Message, BQS) of
+ {invalid, BQS1} ->
+ {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}};
+ {valid, BQS1} ->
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
+ BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS1),
+ {{valid, true}, NeedsConfirming,
+ State#q{backing_queue_state = BQS2}}
+ end.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, _, State1} ->
+ {invalid, _, State1} ->
+ State1;
+ {{valid, true}, _, State1} ->
State1;
- {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}} ->
+ {{valid, false}, NeedsConfirming,
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming =
(NeedsConfirming =:= eventually)},
- BQS),
+ Delivery #delivery.sender, BQS),
ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ BQ, fun (BQS) ->
+ {_Guids, BQS1} =
+ BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ {[], BQS1}
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -603,10 +661,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {MsgIds, BQS1} = Fun(BQS),
+maybe_run_queue_via_backing_queue(Mod, Fun,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
run_message_queue(
confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})).
@@ -703,6 +763,9 @@ i(memory, _) ->
M;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
+i(mirror_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name),
+ MPids;
i(Item, _) ->
throw({bad_argument, Item}).
@@ -738,29 +801,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _AckTags, _ChPid} -> 7;
- {reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _AckTags, _ChPid} -> 7;
+ {reject, _AckTags, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -815,9 +878,12 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, _NeedsConfirming, State1} =
+ {Valid, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, State1);
+ reply(case Valid of
+ valid -> true;
+ invalid -> false
+ end, State1);
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
@@ -972,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
@@ -996,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- BQS1 = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1017,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 03c1fdd1b9..726b9befb8 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,7 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ {init, 2},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -47,12 +47,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
@@ -66,7 +66,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -122,7 +122,15 @@ behaviour_info(callbacks) ->
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
- {status, 1}
+ {status, 1},
+
+ %% Passed a function to be invoked with the relevant backing
+ %% queue's state. Useful for when the backing queue or other
+ %% components need to pass functions into the backing queue.
+ {invoke, 3},
+
+ %% TODO: document me
+ {validate_message, 2}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8364ecd8d7..e2c050f5d8 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -358,6 +358,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
new file mode 100644
index 0000000000..30fd6ed34d
--- /dev/null
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -0,0 +1,136 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_coordinator).
+
+-export([start_link/2, add_slave/2, get_gm/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm
+ }).
+
+-define(ONE_SECOND, 1000).
+
+start_link(Queue, GM) ->
+ gen_server2:start_link(?MODULE, [Queue, GM], []).
+
+add_slave(CPid, SlaveNode) ->
+ gen_server2:cast(CPid, {add_slave, SlaveNode}).
+
+get_gm(CPid) ->
+ gen_server2:call(CPid, get_gm, infinity).
+
+%% ---------------------------------------------------------------------------
+%% gen_server
+%% ---------------------------------------------------------------------------
+
+init([#amqqueue { name = QueueName } = Q, GM]) ->
+ GM1 = case GM of
+ undefined ->
+ ok = gm:create_tables(),
+ {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM2, _Members} ->
+ ok
+ end,
+ GM2;
+ _ ->
+ true = link(GM),
+ GM
+ end,
+ {ok, _TRef} =
+ timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ {ok, #state { q = Q, gm = GM1 }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(get_gm, _From, State = #state { gm = GM }) ->
+ reply(GM, State).
+
+handle_cast({add_slave, Node}, State = #state { q = Q }) ->
+ Nodes = nodes(),
+ case lists:member(Node, Nodes) of
+ true ->
+ Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]),
+ rabbit_log:info("Adding slave node for ~s: ~p~n",
+ [rabbit_misc:rs(Q #amqqueue.name), Result]);
+ false ->
+ rabbit_log:info(
+ "Ignoring request to add slave on node ~p for ~s~n",
+ [Node, rabbit_misc:rs(Q #amqqueue.name)])
+ end,
+ noreply(State);
+
+handle_cast({gm_deaths, Deaths},
+ State = #state { q = #amqqueue { name = QueueName } }) ->
+ rabbit_log:info("Master ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node() ->
+ noreply(State);
+ {error, not_found} ->
+ {stop, normal, State}
+ end.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, #state{}) ->
+ %% gen_server case
+ ok;
+terminate([_CPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([CPid], Members) ->
+ CPid ! {joined, self(), Members},
+ ok.
+
+members_changed([_CPid], _Births, []) ->
+ ok;
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
+
+handle_msg([_CPid], _From, heartbeat) ->
+ ok;
+handle_msg([_CPid], _From, _Msg) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
new file mode 100644
index 0000000000..dd2357bb48
--- /dev/null
+++ b/src/rabbit_mirror_queue_master.erl
@@ -0,0 +1,285 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_master).
+
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1, invoke/3]).
+
+-export([start/1, stop/0]).
+
+-export([promote_backing_queue_state/5]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(state, { gm,
+ coordinator,
+ backing_queue,
+ backing_queue_state,
+ set_delivered,
+ seen_status
+ }).
+
+%% ---------------------------------------------------------------------------
+%% Backing queue
+%% ---------------------------------------------------------------------------
+
+start(_DurableQueues) ->
+ %% This will never get called as this module will never be
+ %% installed as the default BQ implementation.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+stop() ->
+ %% Same as start/1.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+init(#amqqueue { arguments = Args } = Q, Recover) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
+ Nodes1 = case Nodes of
+ [] -> nodes();
+ _ -> [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes]
+ end,
+ [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1],
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover),
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = 0,
+ seen_status = dict:new() }.
+
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen_status = SeenStatus }.
+
+terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination. The queue is going down but
+ %% shouldn't be deleted. Most likely safe shutdown of this
+ %% node. Thus just let some other slave take over.
+ State #state { backing_queue_state = BQ:terminate(BQS) }.
+
+delete_and_terminate(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, delete_and_terminate),
+ State #state { backing_queue_state = BQ:delete_and_terminate(BQS),
+ set_delivered = 0 }.
+
+purge(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_length, 0}),
+ {Count, BQS1} = BQ:purge(BQS),
+ {Count, State #state { backing_queue_state = BQS1,
+ set_delivered = 0 }}.
+
+publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }.
+
+publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
+ ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ %% Must use confirmed_broadcast here in order to guarantee that
+ %% all slaves are forced to interpret this publish_delivered at
+ %% the same point, especially if we die and a slave is promoted.
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ ok = gm:confirmed_broadcast(
+ GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }.
+
+dropwhile(Fun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered }) ->
+ Len = BQ:len(BQS),
+ BQS1 = BQ:dropwhile(Fun, BQS),
+ Dropped = Len - BQ:len(BQS1),
+ SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
+ ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }.
+
+fetch(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered,
+ seen_status = SS }) ->
+ {Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {#basic_message { id = MsgId } = Message, IsDelivered, AckTag,
+ Remaining} ->
+ ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ SS1 = case SetDelivered + SetDelivered1 of
+ 1 -> dict:new(); %% transition to empty
+ _ -> SS
+ end,
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State1 #state { set_delivered = SetDelivered1,
+ seen_status = SS1 }}
+ end.
+
+ack(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:ack(AckTags, BQS),
+ case MsgIds of
+ [] -> ok;
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ end,
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
+
+tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_publish, Txn, MsgId, MsgProps, ChPid})
+ State.
+
+tx_ack(Txn, AckTags, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_ack, Txn, MsgIds})
+ State.
+
+tx_rollback(Txn, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_rollback, Txn})
+ {[], State}.
+
+tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) ->
+ %% Maybe don't want to transmit the MsgPropsFun but what choice do
+ %% we have? OTOH, on the slaves, things won't be expiring on their
+ %% own (props are interpreted by amqqueue, not vq), so if the msg
+ %% props aren't quite the same, that doesn't matter.
+ %%
+ %% The PostCommitFun is actually worse - we need to prevent that
+ %% from being invoked until we have confirmation from all the
+ %% slaves that they've done everything up to there.
+ %%
+ %% In fact, transactions are going to need work seeing as it's at
+ %% this point that VQ mentions amqqueue, which will thus not work
+ %% on the slaves - we need to make sure that all the slaves do the
+ %% tx_commit_post_msg_store at the same point, and then when they
+ %% all confirm that (scatter/gather), we can finally invoke the
+ %% PostCommitFun.
+ %%
+ %% Another idea is that the slaves are actually driven with
+ %% pubacks and thus only the master needs to support txns
+ %% directly.
+ {[], State}.
+
+requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}),
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
+
+len(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:len(BQS).
+
+is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:is_empty(BQS).
+
+set_ram_duration_target(Target, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state =
+ BQ:set_ram_duration_target(Target, BQS) }.
+
+ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Result, BQS1} = BQ:ram_duration(BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
+needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:needs_idle_timeout(BQS).
+
+idle_timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ State #state { backing_queue_state = BQ:idle_timeout(BQS) }.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+
+status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:status(BQS).
+
+invoke(?MODULE, Fun, State) ->
+ Fun(State);
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ {MsgIds, State #state { backing_queue_state = BQS1 }}.
+
+validate_message(Message = #basic_message { id = MsgId },
+ State = #state { seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BSQ }) ->
+ %% Here, we need to deal with the possibility that we're about to
+ %% receive a message that we've already seen when we were a slave
+ %% (we received it via gm). Thus if we do receive such message now
+ %% via the channel, there may be a confirm waiting to issue for
+ %% it.
+
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, SS) of
+ error ->
+ %% We permit the underlying BQ to have a peek at it, but
+ %% only if we ourselves are not filtering out the msg.
+ {Result, BQS1} = BQ:validate_message(Message, BQS),
+ {Result, State #state { backing_queue_state = BQS1 }};
+ {ok, {published, ChPid}} ->
+ %% It already got published when we were a slave and no
+ %% confirmation is waiting. amqqueue_process will have, in
+ %% its msg_id_to_channel mapping, the entry for dealing
+ %% with the confirm when that comes back in. The msg is
+ %% invalid. We will not see this again, so erase.
+ {invalid, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {ok, {confirmed, ChPid}} ->
+ %% It got confirmed before we became master, but we've
+ %% only just received the publish from the channel, so
+ %% couldn't previously know what the msg_seq_no was. Thus
+ %% confirm now. As above, amqqueue_process will have the
+ %% entry for the msg_id_to_channel mapping.
+ ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
+ {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}
+ end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
new file mode 100644
index 0000000000..090cb81203
--- /dev/null
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -0,0 +1,46 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_misc).
+
+-export([remove_from_queue/2]).
+
+-include("rabbit.hrl").
+
+remove_from_queue(QueueName, DeadPids) ->
+ DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ %% Someone else could have deleted the queue before we
+ %% get here.
+ case mnesia:read({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [Q = #amqqueue { pid = QPid,
+ mirror_pids = MPids }] ->
+ [QPid1 | MPids1] =
+ [Pid || Pid <- [QPid | MPids],
+ not lists:member(node(Pid), DeadNodes)],
+ case {{QPid, MPids}, {QPid1, MPids1}} of
+ {Same, Same} ->
+ {ok, QPid};
+ _ ->
+ Q1 = Q #amqqueue { pid = QPid1,
+ mirror_pids = MPids1 },
+ ok = rabbit_amqqueue:store_queue(Q1),
+ {ok, QPid1}
+ end
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
new file mode 100644
index 0000000000..87ce31d8df
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -0,0 +1,625 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave).
+
+%% We join the GM group before we add ourselves to the amqqueue
+%% record. As a result:
+%% 1. We can receive msgs from GM that correspond to messages we will
+%% never receive from publishers.
+%% 2. When we receive a message from publishers, we must receive a
+%% message from the GM group for it.
+%% 3. However, that instruction from the GM group can arrive either
+%% before or after the actual message. We need to be able to
+%% distinguish between GM instructions arriving early, and case (1)
+%% above.
+%%
+%% All instructions from the GM group must be processed in the order
+%% in which they're received.
+
+-export([start_link/1, set_maximum_since_use/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, handle_pre_hibernate/1, prioritise_call/3,
+ prioritise_cast/2]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm,
+ master_node,
+ backing_queue,
+ backing_queue_state,
+ sync_timer_ref,
+ rate_timer_ref,
+
+ sender_queues, %% :: Pid -> MsgQ
+ msg_id_ack, %% :: MsgId -> AckTag
+
+ msg_id_status
+ }).
+
+-define(SYNC_INTERVAL, 25). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+
+init([#amqqueue { name = QueueName } = Q]) ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ ok = gm:create_tables(),
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ Self = self(),
+ Node = node(),
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { mirror_pids = MPids1 },
+ write),
+ {ok, QPid};
+ _ ->
+ {error, node_already_present}
+ end
+ end) of
+ {ok, MPid} ->
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target,
+ [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_node = node(MPid),
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ msg_id_status = dict:new()
+ }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ {error, Error} ->
+ {stop, Error}
+ end.
+
+handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "immediate" delivery mode
+
+ %% It is safe to reply 'false' here even if a) we've not seen the
+ %% msg via gm, or b) the master dies before we receive the msg via
+ %% gm. In the case of (a), we will eventually receive the msg via
+ %% gm, and it's only the master's result to the channel that is
+ %% important. In the case of (b), if the master does die and we do
+ %% get promoted then at that point we have no consumers, thus
+ %% 'false' is precisely the correct answer. However, we must be
+ %% careful to _not_ enqueue the message in this case.
+ gen_server2:reply(From, false), %% master may deliver it, not us
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({deliver, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode
+ gen_server2:reply(From, true), %% amqqueue throws away the result anyway
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({gm_deaths, Deaths}, From,
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_node = MNode }) ->
+ rabbit_log:info("Slave ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ %% The GM has told us about deaths, which means we're not going to
+ %% receive any more messages from GM
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= MNode ->
+ reply(ok, State);
+ {ok, Pid} when node(Pid) =:= node() ->
+ promote_me(From, State);
+ {ok, Pid} ->
+ gen_server2:reply(From, ok),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_node = node(Pid) });
+ {error, not_found} ->
+ gen_server2:reply(From, ok),
+ {stop, normal, State}
+ end;
+
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
+
+
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
+
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
+
+handle_cast({deliver, Delivery = #delivery {}}, State) ->
+ %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(update_ram_duration,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 });
+
+handle_cast(sync_timeout, State) ->
+ noreply(backing_queue_idle_timeout(
+ State #state { sync_timer_ref = undefined })).
+
+handle_info(timeout, State) ->
+ noreply(backing_queue_idle_timeout(State));
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate(_Reason, #state { backing_queue_state = undefined }) ->
+ %% We've received a delete_and_terminate from gm, thus nothing to
+ %% do here.
+ ok;
+terminate(Reason, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef }) ->
+ ok = gm:leave(GM),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, BQ, BQS, RateTRef, [], []),
+ rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate([_SPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% mainly copied from amqqueue_process
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ BQS3 = BQ:handle_pre_hibernate(BQS2),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}.
+
+prioritise_call(Msg, _From, _State) ->
+ case Msg of
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ {gm_deaths, _Deaths} -> 5;
+ _ -> 0
+ end.
+
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ update_ram_duration -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ {gm, _Msg} -> 5;
+ _ -> 0
+ end.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([SPid], _Members) ->
+ SPid ! {joined, self()},
+ ok.
+
+members_changed([_SPid], _Births, []) ->
+ ok;
+members_changed([SPid], _Births, Deaths) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {stop, normal} end,
+ fun () ->
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok ->
+ ok;
+ {promote, CPid} ->
+ {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end
+ end).
+
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+maybe_run_queue_via_backing_queue(
+ Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 }).
+
+
+needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
+ never;
+needs_confirming(#delivery { message = #basic_message {
+ is_persistent = true } },
+ #state { q = #amqqueue { durable = true } }) ->
+ eventually;
+needs_confirming(_Delivery, _State) ->
+ immediately.
+
+confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
+ {MS1, CMs} =
+ lists:foldl(
+ fun (MsgId, {MSN, CMsN} = Acc) ->
+ %% We will never see {confirmed, ChPid} here.
+ case dict:find(MsgId, MSN) of
+ error ->
+ %% If it needed confirming, it'll have
+ %% already been done.
+ Acc;
+ {ok, {published, ChPid}} ->
+ %% Still not seen it from the channel, just
+ %% record that it's been confirmed.
+ {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN};
+ {ok, {published, ChPid, MsgSeqNo}} ->
+ %% Seen from both GM and Channel. Can now
+ %% confirm.
+ {dict:erase(MsgId, MSN),
+ gb_trees_cons(ChPid, MsgSeqNo, CMsN)}
+ end
+ end, {MS, gb_trees:empty()}, MsgIds),
+ gb_trees:map(fun (ChPid, MsgSeqNos) ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
+ State #state { msg_id_status = MS1 }.
+
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
+
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ msg_id_ack = MA,
+ msg_id_status = MS }) ->
+ rabbit_log:info("Promoting slave ~p for ~s~n",
+ [self(), rabbit_misc:rs(Q #amqqueue.name)]),
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
+ true = unlink(GM),
+ gen_server2:reply(From, {promote, CPid}),
+ ok = gm:confirmed_broadcast(GM, heartbeat),
+ MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
+ CPid, BQ, BQS, GM, MS),
+ %% We have to do the requeue via this init because otherwise we
+ %% don't have access to the relevent MsgPropsFun. Also, we are
+ %% already in mnesia as the master queue pid. Thus we cannot just
+ %% publish stuff by sending it to ourself - we must pass it
+ %% through to this init, otherwise we can violate ordering
+ %% constraints.
+
+ %% MTC should contain only entries for which we are still
+ %% expecting confirms to come back to use from the underlying BQ.
+
+ %% TODO: what do we do with entries in MS that are 'confirmed'
+ %% already? Well they should end up in the master queue's state,
+ %% and the confirms should be issued either by the
+ %% amqqueue_process if 'immediately', or otherwise by the master
+ %% queue on validate_message?! That's disgusting. There's no way
+ %% validate_message should be side-effecting... though we could at
+ %% least ensure it's idempotent. Hmm.
+ MTC = dict:from_list(
+ [{MsgId, {ChPid, MsgSeqNo}} ||
+ {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
+ AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
+ Deliveries = lists:append([queue:to_list(PubQ)
+ || {_ChPid, PubQ} <- dict:to_list(SQ)]),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, rabbit_mirror_queue_master, MasterState, RateTRef,
+ AckTags, Deliveries, MTC),
+ {become, rabbit_amqqueue_process, QueueState, hibernate}.
+
+noreply(State) ->
+ {NewState, Timeout} = next_state(State),
+ {noreply, NewState, Timeout}.
+
+reply(Reply, State) ->
+ {NewState, Timeout} = next_state(State),
+ {reply, Reply, NewState, Timeout}.
+
+next_state(State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ ensure_rate_timer(State),
+ case BQ:needs_idle_timeout(BQS) of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
+ end.
+
+%% copied+pasted from amqqueue_process
+backing_queue_idle_timeout(State = #state { backing_queue = BQ }) ->
+ maybe_run_queue_via_backing_queue(
+ BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+
+ensure_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
+ State #state { sync_timer_ref = TRef };
+ensure_sync_timer(State) ->
+ State.
+
+stop_sync_timer(State = #state { sync_timer_ref = undefined }) ->
+ State;
+stop_sync_timer(State = #state { sync_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { sync_timer_ref = undefined }.
+
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
+
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ msg_seq_no = MsgSeqNo,
+ sender = ChPid },
+ State = #state { sender_queues = SQ,
+ msg_id_status = MS }) ->
+ %% We will never see {published, ChPid, MsgSeqNo} here.
+ case dict:find(MsgId, MS) of
+ error ->
+ MQ = case dict:find(ChPid, SQ) of
+ {ok, MQ1} -> MQ1;
+ error -> queue:new()
+ end,
+ SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ State #state { sender_queues = SQ1 };
+ {ok, {confirmed, ChPid}} ->
+ %% BQ has confirmed it but we didn't know what the
+ %% msg_seq_no was at the time. We do now!
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ State #state { msg_id_status = dict:erase(MsgId, MS) };
+ {ok, {published, ChPid}} ->
+ %% It was published to the BQ and we didn't know the
+ %% msg_seq_no so couldn't confirm it at the time.
+ case needs_confirming(Delivery, State) of
+ never ->
+ State #state { msg_id_status = dict:erase(MsgId, MS) };
+ eventually ->
+ State #state {
+ msg_id_status =
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ State #state { msg_id_status = dict:erase(MsgId, MS) }
+ end
+ end.
+
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
+ State = #state { sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA,
+ msg_id_status = MS }) ->
+
+ %% We really are going to do the publish right now, even though we
+ %% may not have seen it directly from the channel. As a result, we
+ %% may know that it needs confirming without knowing its
+ %% msg_seq_no, which means that we can see the confirmation come
+ %% back from the backing queue without knowing the msg_seq_no,
+ %% which means that we're going to have to hang on to the fact
+ %% that we've seen the msg_id confirmed until we can associate it
+ %% with a msg_seq_no.
+ MS1 = dict:store(MsgId, {published, ChPid}, MS),
+ {SQ1, MS2} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, MS1};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, MS1};
+ {{value, Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } }},
+ MQ1} ->
+ %% We received the msg from the channel
+ %% first. Thus we need to deal with confirms
+ %% here.
+ {dict:store(ChPid, MQ1, SQ),
+ case needs_confirming(Delivery, State) of
+ never ->
+ MS;
+ eventually ->
+ dict:store(
+ MsgId, {published, ChPid, MsgSeqNo}, MS);
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ MS
+ end};
+ {{value, #delivery {}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% #amqqueue{} record. We'll never receive the
+ %% message directly from the channel. And the
+ %% channel will not be expecting any confirms
+ %% from us.
+ {SQ, MS}
+ end
+ end,
+
+ State1 = State #state { sender_queues = SQ1,
+ msg_id_status = MS2 },
+ %% we probably want to work in BQ:validate_message here
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State1 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ MA1 = case AckRequired of
+ true -> dict:store(MsgId, AckTag, MA);
+ false -> MA
+ end,
+ State1 #state { backing_queue_state = BQS1,
+ msg_id_ack = MA1 }
+ end};
+process_instruction({set_length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ ToDrop = QLen - Length,
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
+process_instruction({fetch, AckRequired, MsgId, Remaining},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ QLen = BQ:len(BQS),
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ MA1 = case AckRequired of
+ true -> dict:store(MsgId, AckTag, MA);
+ false -> MA
+ end,
+ State #state { backing_queue_state = BQS1,
+ msg_id_ack = MA1 };
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
+process_instruction({ack, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
+ [] = MsgIds1 -- MsgIds, %% ASSERTION
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_ack = MA }) ->
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {ok, case length(AckTags) =:= length(MsgIds) of
+ true ->
+ {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% the only thing we can safely do is nuke out our BQ
+ %% and MA
+ {_Count, BQS1} = BQ:purge(BQS),
+ {MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
+ State #state { msg_id_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
+process_instruction(delete_and_terminate,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:delete_and_terminate(BQS),
+ {stop, State #state { backing_queue_state = undefined }}.
+
+msg_ids_to_acktags(MsgIds, MA) ->
+ {AckTags, MA1} =
+ lists:foldl(fun (MsgId, {AckTagsN, MAN}) ->
+ case dict:find(MsgId, MA) of
+ error -> {AckTagsN, MAN};
+ {ok, AckTag} -> {[AckTag | AckTagsN],
+ dict:erase(MsgId, MAN)}
+ end
+ end, {[], MA}, MsgIds),
+ {lists:reverse(AckTags), MA1}.
+
+ack_all(BQ, MA, BQS) ->
+ BQ:ack([AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], BQS).
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
new file mode 100644
index 0000000000..80c0520c08
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -0,0 +1,54 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave_sup).
+
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, queue_sup_queue_recovery},
+ {enables, routing_ready}]}).
+
+-behaviour(supervisor2).
+
+-export([start/0, start_link/0, start_child/2]).
+
+-export([init/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+start() ->
+ {ok, _} =
+ supervisor:start_child(
+ rabbit_sup,
+ {rabbit_mirror_queue_slave_sup,
+ {rabbit_mirror_queue_slave_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
+ ok.
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_child(Node, Args) ->
+ supervisor2:start_child({?SERVER, Node}, Args).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 10, 10},
+ [{rabbit_mirror_queue_slave,
+ {rabbit_mirror_queue_slave, start_link, []},
+ temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 66436920d4..884db799f7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -216,7 +216,8 @@ table_definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index f6a1c92fcc..4f68356440 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -113,7 +113,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid}] -> [QPid | QPids];
- [] -> QPids
+ [#amqqueue{pid = QPid, mirror_pids = MPids}] ->
+ MPids ++ [QPid | QPids];
+ [] ->
+ QPids
end
end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9547cae5f6..e9b8a02063 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2090,7 +2090,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2108,9 +2108,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ VQ = rabbit_variable_queue:init(test_amqqueue(true), false,
fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
@@ -2169,7 +2173,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -2213,7 +2217,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2223,7 +2227,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2257,7 +2261,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2286,7 +2290,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2300,10 +2304,11 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ {_Guids, VQ4} =
+ rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2311,7 +2316,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2335,7 +2340,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
+ VQ1 = rabbit_variable_queue:init(Q, true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 1f0f8bbeac..aa174e96ab 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -124,7 +124,8 @@
auto_delete :: boolean(),
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ pid :: rabbit_types:maybe(pid()),
+ mirror_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index be6691e9af..c9d96db753 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,18 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, multiple_routing_keys/0]).
+ status/1, invoke/3, validate_message/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/4]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -397,15 +397,16 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
+init(Queue, Recover) ->
Self = self(),
- init(QueueName, IsDurable, Recover,
+ init(Queue, Recover,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(Self, MsgIds, ActionTaken)
end,
fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
case IsDurable of
@@ -415,7 +416,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName }, true,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -505,13 +507,13 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
- State = #vqstate { len = 0 }) ->
+ _ChPid, State = #vqstate { len = 0 }) ->
case NeedsConfirming of
true -> blind_confirm(self(), gb_sets:singleton(MsgId));
false -> ok
@@ -521,6 +523,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
+ _ChPid,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -650,13 +653,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State)).
+ {MsgIds, State1} = ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State),
+ {MsgIds, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of
@@ -707,7 +711,7 @@ requeue(AckTags, MsgPropsFun, State) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- a(reduce_memory_use(
+ {MsgIds, State1} =
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
@@ -722,7 +726,8 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ {MsgIds, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -860,6 +865,11 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
+invoke(?MODULE, Fun, State) ->
+ Fun(State).
+
+validate_message(_Msg, State) -> {valid, State}.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -971,7 +981,7 @@ msg_store_close_fds_fun(IsPersistent) ->
Self = self(),
fun () ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
+ Self, ?MODULE,
fun (State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} =
msg_store_close_fds(MSCState, IsPersistent),
@@ -1117,10 +1127,11 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
+ Self, ?MODULE,
+ fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
+ end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> remove_persistent_messages(
@@ -1183,20 +1194,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ {_MsgIds, State1} = ack(Acks, State),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProps},
- {SeqIdsAcc, State2}) ->
+ {SeqIdsAcc, State3}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
- publish(Msg, MsgProps, false, IsPersistent1, State2),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ {SeqId, State4} =
+ publish(Msg, MsgProps, false, IsPersistent1, State3),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
+ end, {PAcks, State1}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+ State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1343,7 +1355,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore} =
+ {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1362,9 +1374,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore},
+ {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1384,21 +1396,24 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
orddict:new(), MsgIdsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ {lists:reverse(AllMsgIds),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new()}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false },
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore};
+ index_on_disk = false,
+ msg_id = MsgId },
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
- {PersistentSeqIdsAcc, MsgIdsByStore}) ->
+ {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ [MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1444,33 +1459,35 @@ msgs_confirmed(MsgIdSet, State) ->
blind_confirm(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
+ QPid, ?MODULE, fun (State) -> msgs_confirmed(MsgIdSet, State) end).
msgs_written_to_disk(QPid, MsgIdSet, removed) ->
blind_confirm(QPid, MsgIdSet);
msgs_written_to_disk(QPid, MsgIdSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(MOD, Written) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, MsgIdSet)) })
+ end).
msg_indices_written_to_disk(QPid, MsgIdSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Written = gb_sets:intersection(UC, MsgIdSet),
- msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(MIOD, Written) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, MsgIdSet)) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes