summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/gm_specs.hrl28
-rw-r--r--include/rabbit.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl25
-rw-r--r--src/gm.erl1308
-rw-r--r--src/gm_test.erl126
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_amqqueue.erl46
-rw-r--r--src/rabbit_amqqueue_process.erl128
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_control.erl6
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl136
-rw-r--r--src/rabbit_mirror_queue_master.erl250
-rw-r--r--src/rabbit_mirror_queue_misc.erl46
-rw-r--r--src/rabbit_mirror_queue_slave.erl529
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl54
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_router.erl6
-rw-r--r--src/rabbit_tests.erl27
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_variable_queue.erl139
20 files changed, 2728 insertions, 162 deletions
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
new file mode 100644
index 0000000000..987866db38
--- /dev/null
+++ b/include/gm_specs.hrl
@@ -0,0 +1,28 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-ifdef(use_specs).
+
+-type(callback_result() :: 'ok' | {'stop', any()}).
+-type(args() :: [any()]).
+-type(members() :: [pid()]).
+
+-spec(joined/2 :: (args(), members()) -> callback_result()).
+-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
+-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
+-spec(terminate/2 :: (args(), term()) -> any()).
+
+-endif.
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 15f5d7c5b0..f79a8106b0 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -45,7 +45,7 @@
-record(exchange, {name, type, durable, auto_delete, internal, arguments}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid}).
+ arguments, pid, mirror_pids}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index accb2c0e6c..9f4f76ca16 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -28,34 +28,35 @@
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
- state()).
+-spec(init/2 :: (rabbit_types:amqqueue(), attempt_recovery()) -> state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/3 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
--spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+-spec(publish/4 :: (rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
+-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
-> {ack(), state()};
(false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), state())
+ rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:message_properties(), state()) -> state()).
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
-spec(tx_commit/4 ::
(rabbit_types:txn(), fun (() -> any()),
message_properties_transformer(), state()) -> {[ack()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
- -> state()).
+ -> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
-spec(set_ram_duration_target/2 ::
@@ -65,3 +66,5 @@
-spec(idle_timeout/1 :: (state()) -> state()).
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
+-spec(invoke/3 :: (atom(), fun ((A) -> A), state()) ->
+ {[rabbit_guid:guid()], state()}).
diff --git a/src/gm.erl b/src/gm.erl
new file mode 100644
index 0000000000..8fea919667
--- /dev/null
+++ b/src/gm.erl
@@ -0,0 +1,1308 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm).
+
+%% Guaranteed Multicast
+%% ====================
+%%
+%% This module provides the ability to create named groups of
+%% processes to which members can be dynamically added and removed,
+%% and for messages to be broadcast within the group that are
+%% guaranteed to reach all members of the group during the lifetime of
+%% the message. The lifetime of a message is defined as being, at a
+%% minimum, the time from which the message is first sent to any
+%% member of the group, up until the time at which it is known by the
+%% member who published the message that the message has reached all
+%% group members.
+%%
+%% The guarantee given is that provided a message, once sent, makes it
+%% to members who do not all leave the group, the message will
+%% continue to propagate to all group members.
+%%
+%% Another way of stating the guarantee is that if member P publishes
+%% messages m and m', then for all members P', if P' is a member of
+%% the group prior to the publication of m, and P' receives m', then
+%% P' will receive m.
+%%
+%% Note that only local-ordering is enforced: i.e. if member P sends
+%% message m and then message m', then for-all members P', if P'
+%% receives m and m', then they will receive m' after m. Causality
+%% ordering is _not_ enforced. I.e. if member P receives message m
+%% and as a result publishes message m', there is no guarantee that
+%% other members P' will receive m before m'.
+%%
+%%
+%% API Use
+%% -------
+%%
+%% Mnesia must be started. Use the idempotent create_tables/0 function
+%% to create the tables required.
+%%
+%% start_link/3
+%% Provide the group name, the callback module name, and a list of any
+%% arguments you wish to be passed into the callback module's
+%% functions. The joined/1 will be called when we have joined the
+%% group, and the list of arguments will have appended to it a list of
+%% the current members of the group. See the comments in
+%% behaviour_info/1 below for further details of the callback
+%% functions.
+%%
+%% leave/1
+%% Provide the Pid. Removes the Pid from the group. The callback
+%% terminate/1 function will be called.
+%%
+%% broadcast/2
+%% Provide the Pid and a Message. The message will be sent to all
+%% members of the group as per the guarantees given above. This is a
+%% cast and the function call will return immediately. There is no
+%% guarantee that the message will reach any member of the group.
+%%
+%% confirmed_broadcast/2
+%% Provide the Pid and a Message. As per broadcast/2 except that this
+%% is a call, not a cast, and only returns 'ok' once the Message has
+%% reached every member of the group. Do not call
+%% confirmed_broadcast/2 directly from the callback module otherwise
+%% you will deadlock the entire group.
+%%
+%% group_members/1
+%% Provide the Pid. Returns a list of the current group members.
+%%
+%%
+%% Implementation Overview
+%% -----------------------
+%%
+%% One possible means of implementation would be a fan-out from the
+%% sender to every member of the group. This would require that the
+%% group is fully connected, and, in the event that the original
+%% sender of the message disappears from the group before the message
+%% has made it to every member of the group, raises questions as to
+%% who is responsible for sending on the message to new group members.
+%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
+%% if the sender dies part way through, who is responsible for
+%% ensuring that the remaining Members receive the Msg? In the event
+%% that within the group, messages sent are broadcast from a subset of
+%% the members, the fan-out arrangement has the potential to
+%% substantially impact the CPU and network workload of such members,
+%% as such members would have to accommodate the cost of sending each
+%% message to every group member.
+%%
+%% Instead, if the members of the group are arranged in a chain, then
+%% it becomes easier to reason about who within the group has received
+%% each message and who has not. It eases issues of responsibility: in
+%% the event of a group member disappearing, the nearest upstream
+%% member of the chain is responsible for ensuring that messages
+%% continue to propagate down the chain. It also results in equal
+%% distribution of sending and receiving workload, even if all
+%% messages are being sent from just a single group member. This
+%% configuration has the further advantage that it is not necessary
+%% for every group member to know of every other group member, and
+%% even that a group member does not have to be accessible from all
+%% other group members.
+%%
+%% Performance is kept high by permitting pipelining and all
+%% communication between joined group members is asynchronous. In the
+%% chain A -> B -> C -> D, if A sends a message to the group, it will
+%% not directly contact C or D. However, it must know that D receives
+%% the message (in addition to B and C) before it can consider the
+%% message fully sent. A simplistic implementation would require that
+%% D replies to C, C replies to B and B then replies to A. This would
+%% result in a propagation delay of twice the length of the chain. It
+%% would also require, in the event of the failure of C, that D knows
+%% to directly contact B and issue the necessary replies. Instead, the
+%% chain forms a ring: D sends the message on to A: D does not
+%% distinguish A as the sender, merely as the next member (downstream)
+%% within the chain (which has now become a ring). When A receives
+%% from D messages that A sent, it knows that all members have
+%% received the message. However, the message is not dead yet: if C
+%% died as B was sending to C, then B would need to detect the death
+%% of C and forward the message on to D instead: thus every node has
+%% to remember every message published until it is told that it can
+%% forget about the message. This is essential not just for dealing
+%% with failure of members, but also for the addition of new members.
+%%
+%% Thus once A receives the message back again, it then sends to B an
+%% acknowledgement for the message, indicating that B can now forget
+%% about the message. B does so, and forwards the ack to C. C forgets
+%% the message, and forwards the ack to D, which forgets the message
+%% and finally forwards the ack back to A. At this point, A takes no
+%% further action: the message and its acknowledgement have made it to
+%% every member of the group. The message is now dead, and any new
+%% member joining the group at this point will not receive the
+%% message.
+%%
+%% We therefore have two roles:
+%%
+%% 1. The sender, who upon receiving their own messages back, must
+%% then send out acknowledgements, and upon receiving their own
+%% acknowledgements back perform no further action.
+%%
+%% 2. The other group members who upon receiving messages and
+%% acknowledgements must update their own internal state accordingly
+%% (the sending member must also do this in order to be able to
+%% accommodate failures), and forwards messages on to their downstream
+%% neighbours.
+%%
+%%
+%% Implementation: It gets trickier
+%% --------------------------------
+%%
+%% Chain A -> B -> C -> D
+%%
+%% A publishes a message which B receives. A now dies. B and D will
+%% detect the death of A, and will link up, thus the chain is now B ->
+%% C -> D. B forwards A's message on to C, who forwards it to D, who
+%% forwards it to B. Thus B is now responsible for A's messages - both
+%% publications and acknowledgements that were in flight at the point
+%% at which A died. Even worse is that this is transitive: after B
+%% forwards A's message to C, B dies as well. Now C is not only
+%% responsible for B's in-flight messages, but is also responsible for
+%% A's in-flight messages.
+%%
+%% Lemma 1: A member can only determine which dead members they have
+%% inherited responsibility for if there is a total ordering on the
+%% conflicting additions and subtractions of members from the group.
+%%
+%% Consider the simultaneous death of B and addition of B' that
+%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
+%% C is responsible for in-flight messages from B. It is easy to
+%% ensure that at least one of them thinks they have inherited B, but
+%% if we do not ensure that exactly one of them inherits B, then we
+%% could have B' converting publishes to acks, which then will crash C
+%% as C does not believe it has issued acks for those messages.
+%%
+%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
+%% becoming A -> C' -> E. Who has inherited which of B, C and D?
+%%
+%% However, for non-conflicting membership changes, only a partial
+%% ordering is required. For example, A -> B -> C becoming A -> A' ->
+%% B. The addition of A', between A and B can have no conflicts with
+%% the death of C: it is clear that A has inherited C's messages.
+%%
+%% For ease of implementation, we adopt the simple solution, of
+%% imposing a total order on all membership changes.
+%%
+%% On the death of a member, it is ensured the dead member's
+%% neighbours become aware of the death, and the upstream neighbour
+%% now sends to its new downstream neighbour its state, including the
+%% messages pending acknowledgement. The downstream neighbour can then
+%% use this to calculate which publishes and acknowledgements it has
+%% missed out on, due to the death of its old upstream. Thus the
+%% downstream can catch up, and continues the propagation of messages
+%% through the group.
+%%
+%% Lemma 2: When a member is joining, it must synchronously
+%% communicate with its upstream member in order to receive its
+%% starting state atomically with its addition to the group.
+%%
+%% New members must start with the same state as their nearest
+%% upstream neighbour. This ensures that it is not surprised by
+%% acknowledgements they are sent, and that should their downstream
+%% neighbour die, they are able to send the correct state to their new
+%% downstream neighbour to ensure it can catch up. Thus in the
+%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
+%% C, A' must start with the state of A, so that it can send C the
+%% correct state when B dies, allowing C to detect any missed
+%% messages.
+%%
+%% If A' starts by adding itself to the group membership, A could then
+%% die, without A' having received the necessary state from A. This
+%% would leave A' responsible for in-flight messages from A, but
+%% having the least knowledge of all, of those messages. Thus A' must
+%% start by synchronously calling A, which then immediately sends A'
+%% back its state. A then adds A' to the group. If A dies at this
+%% point then A' will be able to see this (as A' will fail to appear
+%% in the group membership), and thus A' will ignore the state it
+%% receives from A, and will simply repeat the process, trying to now
+%% join downstream from some other member. This ensures that should
+%% the upstream die as soon as the new member has been joined, the new
+%% member is guaranteed to receive the correct state, allowing it to
+%% correctly process messages inherited due to the death of its
+%% upstream neighbour.
+%%
+%% The canonical definition of the group membership is held by a
+%% distributed database. Whilst this allows the total ordering of
+%% changes to be achieved, it is nevertheless undesirable to have to
+%% query this database for the current view, upon receiving each
+%% message. Instead, we wish for members to be able to cache a view of
+%% the group membership, which then requires a cache invalidation
+%% mechanism. Each member maintains its own view of the group
+%% membership. Thus when the group's membership changes, members may
+%% need to become aware of such changes in order to be able to
+%% accurately process messages they receive. Because of the
+%% requirement of a total ordering of conflicting membership changes,
+%% it is not possible to use the guaranteed broadcast mechanism to
+%% communicate these changes: to achieve the necessary ordering, it
+%% would be necessary for such messages to be published by exactly one
+%% member, which can not be guaranteed given that such a member could
+%% die.
+%%
+%% The total ordering we enforce on membership changes gives rise to a
+%% view version number: every change to the membership creates a
+%% different view, and the total ordering permits a simple
+%% monotonically increasing view version number.
+%%
+%% Lemma 3: If a message is sent from a member that holds view version
+%% N, it can be correctly processed by any member receiving the
+%% message with a view version >= N.
+%%
+%% Initially, let us suppose that each view contains the ordering of
+%% every member that was ever part of the group. Dead members are
+%% marked as such. Thus we have a ring of members, some of which are
+%% dead, and are thus inherited by the nearest alive downstream
+%% member.
+%%
+%% In the chain A -> B -> C, all three members initially have view
+%% version 1, which reflects reality. B publishes a message, which is
+%% forward by C to A. B now dies, which A notices very quickly. Thus A
+%% updates the view, creating version 2. It now forwards B's
+%% publication, sending that message to its new downstream neighbour,
+%% C. This happens before C is aware of the death of B. C must become
+%% aware of the view change before it interprets the message its
+%% received, otherwise it will fail to learn of the death of B, and
+%% thus will not realise it has inherited B's messages (and will
+%% likely crash).
+%%
+%% Thus very simply, we have that each subsequent view contains more
+%% information than the preceding view.
+%%
+%% However, to avoid the views growing indefinitely, we need to be
+%% able to delete members which have died _and_ for which no messages
+%% are in-flight. This requires that upon inheriting a dead member, we
+%% know the last publication sent by the dead member (this is easy: we
+%% inherit a member because we are the nearest downstream member which
+%% implies that we know at least as much than everyone else about the
+%% publications of the dead member), and we know the earliest message
+%% for which the acknowledgement is still in flight.
+%%
+%% In the chain A -> B -> C, when B dies, A will send to C its state
+%% (as C is the new downstream from A), allowing C to calculate which
+%% messages it has missed out on (described above). At this point, C
+%% also inherits B's messages. If that state from A also includes the
+%% last message published by B for which an acknowledgement has been
+%% seen, then C knows exactly which further acknowledgements it must
+%% receive (also including issuing acknowledgements for publications
+%% still in-flight that it receives), after which it is known there
+%% are no more messages in flight for B, thus all evidence that B was
+%% ever part of the group can be safely removed from the canonical
+%% group membership.
+%%
+%% Thus, for every message that a member sends, it includes with that
+%% message its view version. When a member receives a message it will
+%% update its view from the canonical copy, should its view be older
+%% than the view version included in the message it has received.
+%%
+%% The state held by each member therefore includes the messages from
+%% each publisher pending acknowledgement, the last publication seen
+%% from that publisher, and the last acknowledgement from that
+%% publisher. In the case of the member's own publications or
+%% inherited members, this last acknowledgement seen state indicates
+%% the last acknowledgement retired, rather than sent.
+%%
+%%
+%% Proof sketch
+%% ------------
+%%
+%% We need to prove that with the provided operational semantics, we
+%% can never reach a state that is not well formed from a well-formed
+%% starting state.
+%%
+%% Operational semantics (small step): straight-forward message
+%% sending, process monitoring, state updates.
+%%
+%% Well formed state: dead members inherited by exactly one non-dead
+%% member; for every entry in anyone's pending-acks, either (the
+%% publication of the message is in-flight downstream from the member
+%% and upstream from the publisher) or (the acknowledgement of the
+%% message is in-flight downstream from the publisher and upstream
+%% from the member).
+%%
+%% Proof by induction on the applicable operational semantics.
+%%
+%%
+%% Related work
+%% ------------
+%%
+%% The ring configuration and double traversal of messages around the
+%% ring is similar (though developed independently) to the LCR
+%% protocol by [Levy 2008]. However, LCR differs in several
+%% ways. Firstly, by using vector clocks, it enforces a total order of
+%% message delivery, which is unnecessary for our purposes. More
+%% significantly, it is built on top of a "group communication system"
+%% which performs the group management functions, taking
+%% responsibility away from the protocol as to how to cope with safely
+%% adding and removing members. When membership changes do occur, the
+%% protocol stipulates that every member must perform communication
+%% with every other member of the group, to ensure all outstanding
+%% deliveries complete, before the entire group transitions to the new
+%% view. This, in total, requires two sets of all-to-all synchronous
+%% communications.
+%%
+%% This is not only rather inefficient, but also does not explain what
+%% happens upon the failure of a member during this process. It does
+%% though entirely avoid the need for inheritance of responsibility of
+%% dead members that our protocol incorporates.
+%%
+%% In [Marandi et al 2010], a Paxos-based protocol is described. This
+%% work explicitly focuses on the efficiency of communication. LCR
+%% (and our protocol too) are more efficient, but at the cost of
+%% higher latency. The Ring-Paxos protocol is itself built on top of
+%% IP-multicast, which rules it out for many applications where
+%% point-to-point communication is all that can be required. They also
+%% have an excellent related work section which I really ought to
+%% read...
+%%
+%%
+%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
+%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
+%% Protocol
+
+
+-behaviour(gen_server2).
+
+-export([create_tables/0, start_link/3, leave/1, broadcast/2,
+ confirmed_broadcast/2, group_members/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, prioritise_info/2]).
+
+-export([behaviour_info/1]).
+
+-export([table_definitions/0]).
+
+-define(GROUP_TABLE, gm_group).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(SETS, ordsets).
+-define(DICT, orddict).
+
+-record(state,
+ { self,
+ left,
+ right,
+ group_name,
+ module,
+ view,
+ pub_count,
+ members_state,
+ callback_args,
+ confirms
+ }).
+
+-record(gm_group, { name, version, members }).
+
+-record(view_member, { id, aliases, left, right }).
+
+-record(member, { pending_ack, last_pub, last_ack }).
+
+-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
+ {attributes, record_info(fields, gm_group)}]}).
+-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
+
+-define(TAG, '$gm').
+
+-ifdef(use_specs).
+
+-export_type([group_name/0]).
+
+-type(group_name() :: any()).
+
+-spec(create_tables/0 :: () -> 'ok').
+-spec(start_link/3 :: (group_name(), atom(), [any()]) ->
+ {'ok', pid()} | {'error', any()}).
+-spec(leave/1 :: (pid()) -> 'ok').
+-spec(broadcast/2 :: (pid(), any()) -> 'ok').
+-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
+-spec(group_members/1 :: (pid()) -> [pid()]).
+
+-endif.
+
+behaviour_info(callbacks) ->
+ [
+ %% Called when we've successfully joined the group. Supplied with
+ %% Args provided in start_link, plus current group members.
+ {joined, 2},
+
+ %% Supplied with Args provided in start_link, the list of new
+ %% members and the list of members previously known to us that
+ %% have since died. Note that if a member joins and dies very
+ %% quickly, it's possible that we will never see that member
+ %% appear in either births or deaths. However we are guaranteed
+ %% that (1) we will see a member joining either in the births
+ %% here, or in the members passed to joined/1 before receiving
+ %% any messages from it; and (2) we will not see members die that
+ %% we have not seen born (or supplied in the members to
+ %% joined/1).
+ {members_changed, 3},
+
+ %% Supplied with Args provided in start_link, the sender, and the
+ %% message. This does get called for messages injected by this
+ %% member, however, in such cases, there is no special
+ %% significance of this call: it does not indicate that the
+ %% message has made it to any other members, let alone all other
+ %% members.
+ {handle_msg, 3},
+
+ %% Called on gm member termination as per rules in gen_server,
+ %% with the Args provided in start_link plus the termination
+ %% Reason.
+ {terminate, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+create_tables() ->
+ create_tables([?TABLE]).
+
+create_tables([]) ->
+ ok;
+create_tables([{Table, Attributes} | Tables]) ->
+ case mnesia:create_table(Table, Attributes) of
+ {atomic, ok} -> create_tables(Tables);
+ {aborted, {already_exists, gm_group}} -> create_tables(Tables);
+ Err -> Err
+ end.
+
+table_definitions() ->
+ {Name, Attributes} = ?TABLE,
+ [{Name, [?TABLE_MATCH | Attributes]}].
+
+start_link(GroupName, Module, Args) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+
+leave(Server) ->
+ gen_server2:cast(Server, leave).
+
+broadcast(Server, Msg) ->
+ gen_server2:cast(Server, {broadcast, Msg}).
+
+confirmed_broadcast(Server, Msg) ->
+ gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
+
+group_members(Server) ->
+ gen_server2:call(Server, group_members, infinity).
+
+
+init([GroupName, Module, Args]) ->
+ random:seed(now()),
+ gen_server2:cast(self(), join),
+ Self = self(),
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = 0,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+
+handle_call({confirmed_broadcast, _Msg}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
+handle_call({confirmed_broadcast, Msg}, _From,
+ State = #state { self = Self,
+ right = {Self, undefined},
+ module = Module,
+ callback_args = Args }) ->
+ handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+
+handle_call({confirmed_broadcast, Msg}, From, State) ->
+ internal_broadcast(Msg, From, State);
+
+handle_call(group_members, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
+handle_call(group_members, _From, State = #state { view = View }) ->
+ reply(alive_view_members(View), State);
+
+handle_call({add_on_right, _NewMember}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_ready, State);
+
+handle_call({add_on_right, NewMember}, _From,
+ State = #state { self = Self,
+ group_name = GroupName,
+ view = View,
+ members_state = MembersState,
+ module = Module,
+ callback_args = Args }) ->
+ Group = record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(
+ MembersState)})
+ end),
+ View2 = group_to_view(Group),
+ State1 = check_neighbours(State #state { view = View2 }),
+ Result = callback_view_changed(Args, Module, View, View2),
+ handle_callback_result({Result, {ok, Group}, State1}).
+
+
+handle_cast({?TAG, ReqVer, Msg},
+ State = #state { view = View,
+ group_name = GroupName,
+ module = Module,
+ callback_args = Args }) ->
+ {Result, State1} =
+ case needs_view_update(ReqVer, View) of
+ true ->
+ View1 = group_to_view(read_group(GroupName)),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(State #state { view = View1 })};
+ false ->
+ {ok, State}
+ end,
+ handle_callback_result(
+ if_callback_success(
+ Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+
+handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+ noreply(State);
+
+handle_cast({broadcast, Msg},
+ State = #state { self = Self,
+ right = {Self, undefined},
+ module = Module,
+ callback_args = Args }) ->
+ handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+
+handle_cast({broadcast, Msg}, State) ->
+ internal_broadcast(Msg, none, State);
+
+handle_cast(join, State = #state { self = Self,
+ group_name = GroupName,
+ members_state = undefined,
+ module = Module,
+ callback_args = Args }) ->
+ View = join_group(Self, GroupName),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, all_known_members(View)), State1});
+
+handle_cast(leave, State) ->
+ {stop, normal, State}.
+
+
+handle_info({'DOWN', MRef, process, _Pid, _Reason},
+ State = #state { self = Self,
+ left = Left,
+ right = Right,
+ group_name = GroupName,
+ view = View,
+ module = Module,
+ callback_args = Args,
+ confirms = Confirms }) ->
+ Member = case {Left, Right} of
+ {{Member1, MRef}, _} -> Member1;
+ {_, {Member1, MRef}} -> Member1;
+ _ -> undefined
+ end,
+ case Member of
+ undefined ->
+ noreply(State);
+ _ ->
+ View1 =
+ group_to_view(record_dead_member_in_group(Member, GroupName)),
+ State1 = State #state { view = View1 },
+ {Result, State2} =
+ case alive_view_members(View1) of
+ [Self] ->
+ maybe_erase_aliases(
+ State1 #state {
+ members_state = blank_member_state(),
+ confirms = purge_confirms(Confirms) });
+ _ ->
+ %% here we won't be pointing out any deaths:
+ %% the concern is that there maybe births
+ %% which we'd otherwise miss.
+ {callback_view_changed(Args, Module, View, View1),
+ State1}
+ end,
+ handle_callback_result({Result, check_neighbours(State2)})
+ end.
+
+
+terminate(Reason, #state { module = Module,
+ callback_args = Args }) ->
+ Module:terminate(Args, Reason).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
+prioritise_info(_ , _State) -> 0.
+
+
+handle_msg(check_neighbours, State) ->
+ %% no-op - it's already been done by the calling handle_cast
+ {ok, State};
+
+handle_msg({catchup, Left, MembersStateLeft},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ right = {Right, _MRefR},
+ view = View,
+ members_state = undefined }) ->
+ ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ {ok, State #state { members_state = MembersStateLeft1 }};
+
+handle_msg({catchup, Left, MembersStateLeft},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ view = View,
+ members_state = MembersState })
+ when MembersState =/= undefined ->
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
+ ?DICT:fetch_keys(MembersStateLeft1)),
+ {MembersState1, Activity} =
+ lists:foldl(
+ fun (Id, MembersStateActivity) ->
+ #member { pending_ack = PALeft, last_ack = LA } =
+ find_member_or_blank(Id, MembersStateLeft1),
+ with_member_acc(
+ fun (#member { pending_ack = PA } = Member, Activity1) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {_AcksInFlight, Pubs, _PA1} =
+ find_prefix_common_suffix(PALeft, PA),
+ {Member #member { last_ack = LA },
+ activity_cons(Id, pubs_from_queue(Pubs),
+ [], Activity1)};
+ false ->
+ {Acks, _Common, Pubs} =
+ find_prefix_common_suffix(PA, PALeft),
+ {Member,
+ activity_cons(Id, pubs_from_queue(Pubs),
+ acks_from_queue(Acks),
+ Activity1)}
+ end
+ end, Id, MembersStateActivity)
+ end, {MembersState, activity_nil()}, AllMembers),
+ handle_msg({activity, Left, activity_finalise(Activity)},
+ State #state { members_state = MembersState1 });
+
+handle_msg({catchup, _NotLeft, _MembersState}, State) ->
+ {ok, State};
+
+handle_msg({activity, Left, Activity},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ view = View,
+ members_state = MembersState,
+ confirms = Confirms })
+ when MembersState =/= undefined ->
+ {MembersState1, {Confirms1, Activity1}} =
+ lists:foldl(
+ fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
+ with_member_acc(
+ fun (Member = #member { pending_ack = PA,
+ last_pub = LP,
+ last_ack = LA },
+ {Confirms2, Activity2}) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {ToAck, PA1} =
+ find_common(queue_from_pubs(Pubs), PA,
+ queue:new()),
+ LA1 = last_ack(Acks, LA),
+ AckNums = acks_from_queue(ToAck),
+ Confirms3 = maybe_confirm(
+ Self, Id, Confirms2, AckNums),
+ {Member #member { pending_ack = PA1,
+ last_ack = LA1 },
+ {Confirms3,
+ activity_cons(
+ Id, [], AckNums, Activity2)}};
+ false ->
+ PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
+ LA1 = last_ack(Acks, LA),
+ LP1 = last_pub(Pubs, LP),
+ {Member #member { pending_ack = PA1,
+ last_pub = LP1,
+ last_ack = LA1 },
+ {Confirms2,
+ activity_cons(Id, Pubs, Acks, Activity2)}}
+ end
+ end, Id, MembersStateConfirmsActivity)
+ end, {MembersState, {Confirms, activity_nil()}}, Activity),
+ State1 = State #state { members_state = MembersState1,
+ confirms = Confirms1 },
+ Activity3 = activity_finalise(Activity1),
+ {Result, State2} = maybe_erase_aliases(State1),
+ ok = maybe_send_activity(Activity3, State2),
+ if_callback_success(
+ Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
+
+handle_msg({activity, _NotLeft, _Activity}, State) ->
+ {ok, State}.
+
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
+
+internal_broadcast(Msg, From, State = #state { self = Self,
+ pub_count = PubCount,
+ members_state = MembersState,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args }) ->
+ PubMsg = {PubCount, Msg},
+ Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
+ ok = maybe_send_activity(activity_finalise(Activity), State),
+ MembersState1 =
+ with_member(
+ fun (Member = #member { pending_ack = PA }) ->
+ Member #member { pending_ack = queue:in(PubMsg, PA) }
+ end, Self, MembersState),
+ Confirms1 = case From of
+ none -> Confirms;
+ _ -> queue:in({PubCount, From}, Confirms)
+ end,
+ handle_callback_result({Module:handle_msg(Args, Self, Msg),
+ State #state { pub_count = PubCount + 1,
+ members_state = MembersState1,
+ confirms = Confirms1 }}).
+
+
+%% ---------------------------------------------------------------------------
+%% View construction and inspection
+%% ---------------------------------------------------------------------------
+
+needs_view_update(ReqVer, {Ver, _View}) ->
+ Ver < ReqVer.
+
+view_version({Ver, _View}) ->
+ Ver.
+
+is_member_alive({dead, _Member}) -> false;
+is_member_alive(_) -> true.
+
+is_member_alias(Self, Self, _View) ->
+ true;
+is_member_alias(Member, Self, View) ->
+ ?SETS:is_element(Member,
+ ((fetch_view_member(Self, View)) #view_member.aliases)).
+
+dead_member_id({dead, Member}) -> Member.
+
+store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
+ {Ver, ?DICT:store(Id, VMember, View)}.
+
+with_view_member(Fun, View, Id) ->
+ store_view_member(Fun(fetch_view_member(Id, View)), View).
+
+fetch_view_member(Id, {_Ver, View}) ->
+ ?DICT:fetch(Id, View).
+
+find_view_member(Id, {_Ver, View}) ->
+ ?DICT:find(Id, View).
+
+blank_view(Ver) ->
+ {Ver, ?DICT:new()}.
+
+alive_view_members({_Ver, View}) ->
+ ?DICT:fetch_keys(View).
+
+all_known_members({_Ver, View}) ->
+ ?DICT:fold(
+ fun (Member, #view_member { aliases = Aliases }, Acc) ->
+ ?SETS:to_list(Aliases) ++ [Member | Acc]
+ end, [], View).
+
+group_to_view(#gm_group { members = Members, version = Ver }) ->
+ Alive = lists:filter(fun is_member_alive/1, Members),
+ [_|_] = Alive, %% ASSERTION - can't have all dead members
+ add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
+
+link_view([Left, Middle, Right | Rest], View) ->
+ case find_view_member(Middle, View) of
+ error ->
+ link_view(
+ [Middle, Right | Rest],
+ store_view_member(#view_member { id = Middle,
+ aliases = ?SETS:new(),
+ left = Left,
+ right = Right }, View));
+ {ok, _} ->
+ View
+ end;
+link_view(_, View) ->
+ View.
+
+add_aliases(View, Members) ->
+ Members1 = ensure_alive_suffix(Members),
+ {EmptyDeadSet, View1} =
+ lists:foldl(
+ fun (Member, {DeadAcc, ViewAcc}) ->
+ case is_member_alive(Member) of
+ true ->
+ {?SETS:new(),
+ with_view_member(
+ fun (VMember =
+ #view_member { aliases = Aliases }) ->
+ VMember #view_member {
+ aliases = ?SETS:union(Aliases, DeadAcc) }
+ end, ViewAcc, Member)};
+ false ->
+ {?SETS:add_element(dead_member_id(Member), DeadAcc),
+ ViewAcc}
+ end
+ end, {?SETS:new(), View}, Members1),
+ 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
+ View1.
+
+ensure_alive_suffix(Members) ->
+ queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
+
+ensure_alive_suffix1(MembersQ) ->
+ {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
+ case is_member_alive(Member) of
+ true -> MembersQ;
+ false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
+ end.
+
+
+%% ---------------------------------------------------------------------------
+%% View modification
+%% ---------------------------------------------------------------------------
+
+join_group(Self, GroupName) ->
+ join_group(Self, GroupName, read_group(GroupName)).
+
+join_group(Self, GroupName, {error, not_found}) ->
+ join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+ group_to_view(Group);
+join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+ case lists:member(Self, Members) of
+ true ->
+ group_to_view(Group);
+ false ->
+ case lists:filter(fun is_member_alive/1, Members) of
+ [] ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName));
+ Alive ->
+ Left = lists:nth(random:uniform(length(Alive)), Alive),
+ try
+ case gen_server2:call(
+ Left, {add_on_right, Self}, infinity) of
+ {ok, Group1} -> group_to_view(Group1);
+ not_ready -> join_group(Self, GroupName)
+ end
+ catch
+ exit:{R, _}
+ when R =:= noproc; R =:= normal; R =:= shutdown ->
+ join_group(
+ Self, GroupName,
+ record_dead_member_in_group(Left, GroupName))
+ end
+ end
+ end.
+
+read_group(GroupName) ->
+ case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
+ [] -> {error, not_found};
+ [Group] -> Group
+ end.
+
+prune_or_create_group(Self, GroupName) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () -> GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = 0 },
+ case mnesia:read(?GROUP_TABLE, GroupName) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
+ Group.
+
+record_dead_member_in_group(Member, GroupName) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ case lists:splitwith(
+ fun (Member1) -> Member1 =/= Member end, Members) of
+ {_Members1, []} -> %% not found - already recorded dead
+ Group1;
+ {Members1, [Member | Members2]} ->
+ Members3 = Members1 ++ [{dead, Member} | Members2],
+ Group2 = Group1 #gm_group { members = Members3,
+ version = Ver + 1 },
+ mnesia:write(Group2),
+ Group2
+ end
+ end),
+ Group.
+
+record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () ->
+ [#gm_group { members = Members, version = Ver } = Group1] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ Members1 = Prefix ++ [Left, NewMember | Suffix],
+ Group2 = Group1 #gm_group { members = Members1,
+ version = Ver + 1 },
+ ok = Fun(Group2),
+ mnesia:write(Group2),
+ Group2
+ end),
+ Group.
+
+erase_members_in_group(Members, GroupName) ->
+ DeadMembers = [{dead, Id} || Id <- Members],
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () ->
+ [Group1 = #gm_group { members = [_|_] = Members1,
+ version = Ver }] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ case Members1 -- DeadMembers of
+ Members1 -> Group1;
+ Members2 -> Group2 =
+ Group1 #gm_group { members = Members2,
+ version = Ver + 1 },
+ mnesia:write(Group2),
+ Group2
+ end
+ end),
+ Group.
+
+maybe_erase_aliases(State = #state { self = Self,
+ group_name = GroupName,
+ view = View,
+ members_state = MembersState,
+ module = Module,
+ callback_args = Args }) ->
+ #view_member { aliases = Aliases } = fetch_view_member(Self, View),
+ {Erasable, MembersState1}
+ = ?SETS:fold(
+ fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
+ #member { last_pub = LP, last_ack = LA } =
+ find_member_or_blank(Id, MembersState),
+ case can_erase_view_member(Self, Id, LA, LP) of
+ true -> {[Id | ErasableAcc],
+ erase_member(Id, MembersStateAcc)};
+ false -> Acc
+ end
+ end, {[], MembersState}, Aliases),
+ State1 = State #state { members_state = MembersState1 },
+ case Erasable of
+ [] -> {ok, State1};
+ _ -> View1 = group_to_view(
+ erase_members_in_group(Erasable, GroupName)),
+ {callback_view_changed(Args, Module, View, View1),
+ State1 #state { view = View1 }}
+ end.
+
+can_erase_view_member(Self, Self, _LA, _LP) -> false;
+can_erase_view_member(_Self, _Id, N, N) -> true;
+can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
+
+
+%% ---------------------------------------------------------------------------
+%% View monitoring and maintanence
+%% ---------------------------------------------------------------------------
+
+ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
+ {Self, undefined};
+ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
+ ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
+ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
+ {RealNeighbour, MRef};
+ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
+ true = erlang:demonitor(MRef),
+ Msg = {?TAG, Ver, check_neighbours},
+ ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = case Neighbour of
+ Self -> ok;
+ _ -> gen_server2:cast(Neighbour, Msg)
+ end,
+ {Neighbour, maybe_monitor(Neighbour, Self)}.
+
+maybe_monitor(Self, Self) ->
+ undefined;
+maybe_monitor(Other, _Self) ->
+ erlang:monitor(process, Other).
+
+check_neighbours(State = #state { self = Self,
+ left = Left,
+ right = Right,
+ view = View }) ->
+ #view_member { left = VLeft, right = VRight }
+ = fetch_view_member(Self, View),
+ Ver = view_version(View),
+ Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
+ Right1 = ensure_neighbour(Ver, Self, Right, VRight),
+ State1 = State #state { left = Left1, right = Right1 },
+ ok = maybe_send_catchup(Right, State1),
+ State1.
+
+maybe_send_catchup(Right, #state { right = Right }) ->
+ ok;
+maybe_send_catchup(_Right, #state { self = Self,
+ right = {Self, undefined} }) ->
+ ok;
+maybe_send_catchup(_Right, #state { members_state = undefined }) ->
+ ok;
+maybe_send_catchup(_Right, #state { self = Self,
+ right = {Right, _MRef},
+ view = View,
+ members_state = MembersState }) ->
+ send_right(Right, View,
+ {catchup, Self, prepare_members_state(MembersState)}).
+
+
+%% ---------------------------------------------------------------------------
+%% Catch_up delta detection
+%% ---------------------------------------------------------------------------
+
+find_prefix_common_suffix(A, B) ->
+ {Prefix, A1} = find_prefix(A, B, queue:new()),
+ {Common, Suffix} = find_common(A1, B, queue:new()),
+ {Prefix, Common, Suffix}.
+
+%% Returns the elements of A that occur before the first element of B,
+%% plus the remainder of A.
+find_prefix(A, B, Prefix) ->
+ case {queue:out(A), queue:out(B)} of
+ {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
+ {Prefix, A};
+ {{empty, A1}, {{value, _A}, _B1}} ->
+ {Prefix, A1};
+ {{{value, {NumA, _MsgA} = Val}, A1},
+ {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
+ find_prefix(A1, B, queue:in(Val, Prefix));
+ {_, {empty, _B1}} ->
+ {A, Prefix} %% Prefix well be empty here
+ end.
+
+%% A should be a prefix of B. Returns the commonality plus the
+%% remainder of B.
+find_common(A, B, Common) ->
+ case {queue:out(A), queue:out(B)} of
+ {{{value, Val}, A1}, {{value, Val}, B1}} ->
+ find_common(A1, B1, queue:in(Val, Common));
+ {{empty, _A}, _} ->
+ {Common, B}
+ end.
+
+
+%% ---------------------------------------------------------------------------
+%% Members helpers
+%% ---------------------------------------------------------------------------
+
+with_member(Fun, Id, MembersState) ->
+ store_member(
+ Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
+
+with_member_acc(Fun, Id, {MembersState, Acc}) ->
+ {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
+ {store_member(Id, MemberState, MembersState), Acc1}.
+
+find_member_or_blank(Id, MembersState) ->
+ case ?DICT:find(Id, MembersState) of
+ {ok, Result} -> Result;
+ error -> blank_member()
+ end.
+
+erase_member(Id, MembersState) ->
+ ?DICT:erase(Id, MembersState).
+
+blank_member() ->
+ #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
+
+blank_member_state() ->
+ ?DICT:new().
+
+store_member(Id, MemberState, MembersState) ->
+ ?DICT:store(Id, MemberState, MembersState).
+
+prepare_members_state(MembersState) ->
+ ?DICT:to_list(MembersState).
+
+build_members_state(MembersStateList) ->
+ ?DICT:from_list(MembersStateList).
+
+
+%% ---------------------------------------------------------------------------
+%% Activity assembly
+%% ---------------------------------------------------------------------------
+
+activity_nil() ->
+ queue:new().
+
+activity_cons(_Id, [], [], Tail) ->
+ Tail;
+activity_cons(Sender, Pubs, Acks, Tail) ->
+ queue:in({Sender, Pubs, Acks}, Tail).
+
+activity_finalise(Activity) ->
+ queue:to_list(Activity).
+
+maybe_send_activity([], _State) ->
+ ok;
+maybe_send_activity(Activity, #state { self = Self,
+ right = {Right, _MRefR},
+ view = View }) ->
+ send_right(Right, View, {activity, Self, Activity}).
+
+send_right(Right, View, Msg) ->
+ ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+
+callback(Args, Module, Activity) ->
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, ok) ->
+ lists:foldl(fun ({_PubNum, Pub}, ok) ->
+ Module:handle_msg(Args, Id, Pub);
+ (_, Error) ->
+ Error
+ end, ok, Pubs);
+ (_, Error) ->
+ Error
+ end, ok, Activity).
+
+callback_view_changed(Args, Module, OldView, NewView) ->
+ OldMembers = all_known_members(OldView),
+ NewMembers = all_known_members(NewView),
+ Births = NewMembers -- OldMembers,
+ Deaths = OldMembers -- NewMembers,
+ case {Births, Deaths} of
+ {[], []} -> ok;
+ _ -> Module:members_changed(Args, Births, Deaths)
+ end.
+
+handle_callback_result({Result, State}) ->
+ if_callback_success(
+ Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
+handle_callback_result({Result, Reply, State}) ->
+ if_callback_success(
+ Result, fun reply_true/3, fun reply_false/3, Reply, State).
+
+no_reply_true (_Result, _Undefined, State) -> noreply(State).
+no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
+
+reply_true (_Result, Reply, State) -> reply(Reply, State).
+reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
+
+handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
+handle_msg_false(Result, _Msg, State) -> {Result, State}.
+
+activity_true(_Result, Activity, State = #state { module = Module,
+ callback_args = Args }) ->
+ {callback(Args, Module, Activity), State}.
+activity_false(Result, _Activity, State) ->
+ {Result, State}.
+
+if_callback_success(ok, True, _False, Arg, State) ->
+ True(ok, Arg, State);
+if_callback_success(
+ {become, Module, Args} = Result, True, _False, Arg, State) ->
+ True(Result, Arg, State #state { module = Module,
+ callback_args = Args });
+if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+ False(Result, Arg, State).
+
+maybe_confirm(_Self, _Id, Confirms, []) ->
+ Confirms;
+maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
+ case queue:out(Confirms) of
+ {empty, _Confirms} ->
+ Confirms;
+ {{value, {PubNum, From}}, Confirms1} ->
+ gen_server2:reply(From, ok),
+ maybe_confirm(Self, Self, Confirms1, PubNums);
+ {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
+ maybe_confirm(Self, Self, Confirms, PubNums)
+ end;
+maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
+ Confirms.
+
+purge_confirms(Confirms) ->
+ [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
+ queue:new().
+
+
+%% ---------------------------------------------------------------------------
+%% Msg transformation
+%% ---------------------------------------------------------------------------
+
+acks_from_queue(Q) ->
+ [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+
+pubs_from_queue(Q) ->
+ queue:to_list(Q).
+
+queue_from_pubs(Pubs) ->
+ queue:from_list(Pubs).
+
+apply_acks([], Pubs) ->
+ Pubs;
+apply_acks(List, Pubs) ->
+ {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
+
+join_pubs(Q, []) -> Q;
+join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
+
+last_ack([], LA) ->
+ LA;
+last_ack(List, LA) ->
+ LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub([], LP) ->
+ LP;
+last_pub(List, LP) ->
+ {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/gm_test.erl b/src/gm_test.erl
new file mode 100644
index 0000000000..e0a92a0c45
--- /dev/null
+++ b/src/gm_test.erl
@@ -0,0 +1,126 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_test).
+
+-export([test/0]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+get_state() ->
+ get(state).
+
+with_state(Fun) ->
+ put(state, Fun(get_state())).
+
+inc() ->
+ case 1 + get(count) of
+ 100000 -> Now = os:timestamp(),
+ Start = put(ts, Now),
+ Diff = timer:now_diff(Now, Start),
+ Rate = 100000 / (Diff / 1000000),
+ io:format("~p seeing ~p msgs/sec~n", [self(), Rate]),
+ put(count, 0);
+ N -> put(count, N)
+ end.
+
+joined([], Members) ->
+ io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
+ put(state, dict:from_list([{Member, empty} || Member <- Members])),
+ put(count, 0),
+ put(ts, os:timestamp()),
+ ok.
+
+members_changed([], Births, Deaths) ->
+ with_state(
+ fun (State) ->
+ State1 =
+ lists:foldl(
+ fun (Born, StateN) ->
+ false = dict:is_key(Born, StateN),
+ dict:store(Born, empty, StateN)
+ end, State, Births),
+ lists:foldl(
+ fun (Died, StateN) ->
+ true = dict:is_key(Died, StateN),
+ dict:store(Died, died, StateN)
+ end, State1, Deaths)
+ end),
+ ok.
+
+handle_msg([], From, {test_msg, Num}) ->
+ inc(),
+ with_state(
+ fun (State) ->
+ ok = case dict:find(From, State) of
+ {ok, died} ->
+ exit({{from, From},
+ {received_posthumous_delivery, Num}});
+ {ok, empty} -> ok;
+ {ok, Num} -> ok;
+ {ok, Num1} when Num < Num1 ->
+ exit({{from, From},
+ {duplicate_delivery_of, Num1},
+ {expecting, Num}});
+ {ok, Num1} ->
+ exit({{from, From},
+ {missing_delivery_of, Num},
+ {received_early, Num1}});
+ error ->
+ exit({{from, From},
+ {received_premature_delivery, Num}})
+ end,
+ dict:store(From, Num + 1, State)
+ end),
+ ok.
+
+terminate([], Reason) ->
+ io:format("Left ~p (~p)~n", [self(), Reason]),
+ ok.
+
+spawn_member() ->
+ spawn_link(
+ fun () ->
+ random:seed(now()),
+ %% start up delay of no more than 10 seconds
+ timer:sleep(random:uniform(10000)),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ Start = random:uniform(10000),
+ send_loop(Pid, Start, Start + random:uniform(10000)),
+ gm:leave(Pid),
+ spawn_more()
+ end).
+
+spawn_more() ->
+ [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))].
+
+send_loop(_Pid, Target, Target) ->
+ ok;
+send_loop(Pid, Count, Target) when Target > Count ->
+ case random:uniform(3) of
+ 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count});
+ _ -> gm:broadcast(Pid, {test_msg, Count})
+ end,
+ timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms
+ send_loop(Pid, Count + 1, Target).
+
+test() ->
+ ok = gm:create_tables(),
+ spawn_member(),
+ spawn_member().
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1beed5c1a7..d967cfb9cd 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -36,6 +36,12 @@
[]}},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({rabbit_registry,
+ [{description, "plugin registry"},
+ {mfa, {rabbit_sup, start_child,
+ [rabbit_registry]}},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
{enables, external_infrastructure}]}).
@@ -54,13 +60,6 @@
-rabbit_boot_step({external_infrastructure,
[{description, "external infrastructure ready"}]}).
--rabbit_boot_step({rabbit_registry,
- [{description, "plugin registry"},
- {mfa, {rabbit_sup, start_child,
- [rabbit_registry]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_log,
[{description, "logging server"},
{mfa, {rabbit_sup, start_restartable_child,
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6e5aae27c7..4ef9750c82 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -18,8 +18,8 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
+ maybe_run_queue_via_backing_queue/3,
+ maybe_run_queue_via_backing_queue_async/3,
sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
@@ -33,6 +33,7 @@
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,10 +141,10 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
- (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/3 ::
+ (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
@@ -191,12 +192,13 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
+ Q = start_queue_process(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
exclusive_owner = Owner,
- pid = none}),
+ pid = none,
+ mirror_pids = []}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -450,11 +452,13 @@ internal_delete(QueueName) ->
end
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+maybe_run_queue_via_backing_queue(QPid, Mod, Fun) ->
+ gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun},
+ infinity).
+
+maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
@@ -477,7 +481,8 @@ drop_expired(QPid) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ mirror_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end,
@@ -494,11 +499,12 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
+ #amqqueue{name = QueueName,
+ durable = false,
auto_delete = false,
- arguments = [],
- pid = Pid}.
+ arguments = [],
+ pid = Pid,
+ mirror_pids = []}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 496b206470..0b5ad05939 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,6 +33,8 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
+-export([init_with_backing_queue_state/6]).
+
% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -72,7 +74,8 @@
messages,
consumers,
memory,
- backing_queue_status
+ backing_queue_status,
+ mirror_pids
]).
-define(CREATION_EVENT_KEYS,
@@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, BQ} = application:get_env(backing_queue_module),
{ok, #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
- backing_queue = BQ,
+ backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -115,6 +117,36 @@ init(Q) ->
guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
+ RateTRef, AckTags, Deliveries) ->
+ ?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ case Owner of
+ none -> ok;
+ _ -> erlang:monitor(process, Owner)
+ end,
+ State = requeue_and_run(
+ AckTags,
+ process_args(
+ #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()})),
+ lists:foldl(
+ fun (Delivery, StateN) ->
+ {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN),
+ StateN1
+ end, State, Deliveries).
+
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
terminate({shutdown, _}, State = #q{backing_queue = BQ}) ->
@@ -135,8 +167,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
declare(Recover, From,
- State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
- backing_queue = BQ, backing_queue_state = undefined,
+ State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
stats_timer = StatsTimer}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
@@ -147,7 +178,7 @@ declare(Recover, From,
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
- BQS = BQ:init(QName, IsDurable, Recover),
+ BQS = BQ:init(Q, Recover),
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
@@ -209,6 +240,13 @@ next_state(State) ->
false -> {stop_sync_timer(State2), hibernate}
end.
+backing_queue_module(#amqqueue{arguments = Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of
+ undefined -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ _Nodes -> rabbit_mirror_queue_master
+ end.
+
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(
?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]),
@@ -477,7 +515,7 @@ attempt_delivery(#delivery{txn = none,
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
needs_confirming = (NeedsConfirming =:= confirm)},
- BQS),
+ ChPid, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
@@ -494,7 +532,8 @@ attempt_delivery(#delivery{txn = Txn,
{true,
NeedsConfirming,
State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
+ BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -507,15 +546,17 @@ deliver_or_enqueue(Delivery, State) ->
(message_properties(State)) #message_properties{
needs_confirming =
(NeedsConfirming =:= confirm)},
- BQS),
+ Delivery #delivery.sender, BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ BQ, fun (BQS) ->
+ {_Guids, BQS1} =
+ BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS),
+ {[], BQS1}
+ end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -619,10 +660,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
+maybe_run_queue_via_backing_queue(Mod, Fun,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
run_message_queue(
confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
@@ -720,6 +763,9 @@ i(memory, _) ->
M;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
+i(mirror_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name),
+ MPids;
i(Item, _) ->
throw({bad_argument, Item}).
@@ -755,29 +801,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -993,12 +1039,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));
@@ -1018,7 +1064,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- BQS1 = BQ:ack(AckTags, BQS),
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
{NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
@@ -1039,7 +1085,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6a21e10fd3..1aa6ea6745 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -33,7 +33,7 @@ behaviour_info(callbacks) ->
{stop, 0},
%% Initialise the backing queue and its state.
- {init, 3},
+ {init, 2},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 1},
@@ -47,12 +47,12 @@ behaviour_info(callbacks) ->
{purge, 1},
%% Publish a message.
- {publish, 3},
+ {publish, 4},
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 4},
+ {publish_delivered, 5},
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
@@ -66,7 +66,7 @@ behaviour_info(callbacks) ->
{ack, 2},
%% A publish, but in the context of a transaction.
- {tx_publish, 4},
+ {tx_publish, 5},
%% Acks, but in the context of a transaction.
{tx_ack, 3},
@@ -122,7 +122,12 @@ behaviour_info(callbacks) ->
%% Exists for debugging purposes, to be able to expose state via
%% rabbitmqctl list_queues backing_queue_status
- {status, 1}
+ {status, 1},
+
+ %% Passed a function to be invoked with the relevant backing
+ %% queue's state. Useful for when the backing queue or other
+ %% components need to pass functions into the backing queue.
+ {invoke, 3}
];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 8048309714..15f1e77d77 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -342,6 +342,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] =
Value) when is_binary(TableEntryKey) andalso
is_atom(TableEntryType) ->
io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+format_info_item([T | _] = Value)
+ when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse
+ is_list(T) ->
+ "[" ++
+ lists:nthtail(2, lists:append(
+ [", " ++ format_info_item(E) || E <- Value])) ++ "]";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
new file mode 100644
index 0000000000..30fd6ed34d
--- /dev/null
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -0,0 +1,136 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_coordinator).
+
+-export([start_link/2, add_slave/2, get_gm/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm
+ }).
+
+-define(ONE_SECOND, 1000).
+
+start_link(Queue, GM) ->
+ gen_server2:start_link(?MODULE, [Queue, GM], []).
+
+add_slave(CPid, SlaveNode) ->
+ gen_server2:cast(CPid, {add_slave, SlaveNode}).
+
+get_gm(CPid) ->
+ gen_server2:call(CPid, get_gm, infinity).
+
+%% ---------------------------------------------------------------------------
+%% gen_server
+%% ---------------------------------------------------------------------------
+
+init([#amqqueue { name = QueueName } = Q, GM]) ->
+ GM1 = case GM of
+ undefined ->
+ ok = gm:create_tables(),
+ {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM2, _Members} ->
+ ok
+ end,
+ GM2;
+ _ ->
+ true = link(GM),
+ GM
+ end,
+ {ok, _TRef} =
+ timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ {ok, #state { q = Q, gm = GM1 }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(get_gm, _From, State = #state { gm = GM }) ->
+ reply(GM, State).
+
+handle_cast({add_slave, Node}, State = #state { q = Q }) ->
+ Nodes = nodes(),
+ case lists:member(Node, Nodes) of
+ true ->
+ Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]),
+ rabbit_log:info("Adding slave node for ~s: ~p~n",
+ [rabbit_misc:rs(Q #amqqueue.name), Result]);
+ false ->
+ rabbit_log:info(
+ "Ignoring request to add slave on node ~p for ~s~n",
+ [Node, rabbit_misc:rs(Q #amqqueue.name)])
+ end,
+ noreply(State);
+
+handle_cast({gm_deaths, Deaths},
+ State = #state { q = #amqqueue { name = QueueName } }) ->
+ rabbit_log:info("Master ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= node() ->
+ noreply(State);
+ {error, not_found} ->
+ {stop, normal, State}
+ end.
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+terminate(_Reason, #state{}) ->
+ %% gen_server case
+ ok;
+terminate([_CPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([CPid], Members) ->
+ CPid ! {joined, self(), Members},
+ ok.
+
+members_changed([_CPid], _Births, []) ->
+ ok;
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
+
+handle_msg([_CPid], _From, heartbeat) ->
+ ok;
+handle_msg([_CPid], _From, _Msg) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
new file mode 100644
index 0000000000..11831a2998
--- /dev/null
+++ b/src/rabbit_mirror_queue_master.erl
@@ -0,0 +1,250 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_master).
+
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
+ requeue/3, len/1, is_empty/1, dropwhile/2,
+ set_ram_duration_target/2, ram_duration/1,
+ needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
+ status/1, invoke/3]).
+
+-export([start/1, stop/0]).
+
+-export([promote_backing_queue_state/5]).
+
+-behaviour(rabbit_backing_queue).
+
+-include("rabbit.hrl").
+
+-record(state, { gm,
+ coordinator,
+ backing_queue,
+ backing_queue_state,
+ set_delivered,
+ seen
+ }).
+
+%% ---------------------------------------------------------------------------
+%% Backing queue
+%% ---------------------------------------------------------------------------
+
+start(_DurableQueues) ->
+ %% This will never get called as this module will never be
+ %% installed as the default BQ implementation.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+stop() ->
+ %% Same as start/1.
+ exit({not_valid_for_generic_backing_queue, ?MODULE}).
+
+init(#amqqueue { arguments = Args } = Q, Recover) ->
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
+ Nodes1 = case Nodes of
+ [] -> nodes();
+ _ -> [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes]
+ end,
+ [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1],
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover),
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = 0,
+ seen = sets:new() }.
+
+promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) ->
+ #state { gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = BQ:len(BQS),
+ seen = Seen }.
+
+terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ %% Backing queue termination. The queue is going down but
+ %% shouldn't be deleted. Most likely safe shutdown of this
+ %% node. Thus just let some other slave take over.
+ State #state { backing_queue_state = BQ:terminate(BQS) }.
+
+delete_and_terminate(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, delete_and_terminate),
+ State #state { backing_queue_state = BQ:delete_and_terminate(BQS),
+ set_delivered = 0 }.
+
+purge(State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_length, 0}),
+ {Count, BQS1} = BQ:purge(BQS),
+ {Count, State #state { backing_queue_state = BQS1,
+ set_delivered = 0 }}.
+
+publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }
+ end.
+
+publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps,
+ ChPid, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
+ MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg,
+ MsgProps, ChPid, BQS),
+ {AckTag, State #state { backing_queue_state = BQS1 }}
+ end.
+
+dropwhile(Fun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered }) ->
+ Len = BQ:len(BQS),
+ BQS1 = BQ:dropwhile(Fun, BQS),
+ Dropped = Len - BQ:len(BQS1),
+ SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
+ ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 }.
+
+fetch(AckRequired, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ set_delivered = SetDelivered,
+ seen = Seen }) ->
+ {Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
+ case Result of
+ empty ->
+ {Result, State1};
+ {#basic_message { guid = Guid } = Message, IsDelivered, AckTag,
+ Remaining} ->
+ ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ Seen1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to empty
+ _ -> Seen
+ end,
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State1 #state { set_delivered = SetDelivered1,
+ seen = Seen1 }}
+ end.
+
+ack(AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:ack(AckTags, BQS),
+ case Guids of
+ [] -> ok;
+ _ -> ok = gm:broadcast(GM, {ack, Guids})
+ end,
+ {Guids, State #state { backing_queue_state = BQS1 }}.
+
+tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid})
+ State.
+
+tx_ack(Txn, AckTags, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_ack, Txn, Guids})
+ State.
+
+tx_rollback(Txn, #state {} = State) ->
+ %% gm:broadcast(GM, {tx_rollback, Txn})
+ {[], State}.
+
+tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) ->
+ %% Maybe don't want to transmit the MsgPropsFun but what choice do
+ %% we have? OTOH, on the slaves, things won't be expiring on their
+ %% own (props are interpreted by amqqueue, not vq), so if the msg
+ %% props aren't quite the same, that doesn't matter.
+ %%
+ %% The PostCommitFun is actually worse - we need to prevent that
+ %% from being invoked until we have confirmation from all the
+ %% slaves that they've done everything up to there.
+ %%
+ %% In fact, transactions are going to need work seeing as it's at
+ %% this point that VQ mentions amqqueue, which will thus not work
+ %% on the slaves - we need to make sure that all the slaves do the
+ %% tx_commit_post_msg_store at the same point, and then when they
+ %% all confirm that (scatter/gather), we can finally invoke the
+ %% PostCommitFun.
+ %%
+ %% Another idea is that the slaves are actually driven with
+ %% pubacks and thus only the master needs to support txns
+ %% directly.
+ {[], State}.
+
+requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ ok = gm:broadcast(GM, {requeue, MsgPropsFun, Guids}),
+ {Guids, State #state { backing_queue_state = BQS1 }}.
+
+len(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:len(BQS).
+
+is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:is_empty(BQS).
+
+set_ram_duration_target(Target, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state =
+ BQ:set_ram_duration_target(Target, BQS) }.
+
+ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Result, BQS1} = BQ:ram_duration(BQS),
+ {Result, State #state { backing_queue_state = BQS1 }}.
+
+needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:needs_idle_timeout(BQS).
+
+idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:idle_timeout(BQS).
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
+
+status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
+ BQ:status(BQS).
+
+invoke(?MODULE, Fun, State) ->
+ Fun(State);
+invoke(Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ {Guids, State #state { backing_queue_state = BQS1 }}.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
new file mode 100644
index 0000000000..090cb81203
--- /dev/null
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -0,0 +1,46 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_misc).
+
+-export([remove_from_queue/2]).
+
+-include("rabbit.hrl").
+
+remove_from_queue(QueueName, DeadPids) ->
+ DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ %% Someone else could have deleted the queue before we
+ %% get here.
+ case mnesia:read({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [Q = #amqqueue { pid = QPid,
+ mirror_pids = MPids }] ->
+ [QPid1 | MPids1] =
+ [Pid || Pid <- [QPid | MPids],
+ not lists:member(node(Pid), DeadNodes)],
+ case {{QPid, MPids}, {QPid1, MPids1}} of
+ {Same, Same} ->
+ {ok, QPid};
+ _ ->
+ Q1 = Q #amqqueue { pid = QPid1,
+ mirror_pids = MPids1 },
+ ok = rabbit_amqqueue:store_queue(Q1),
+ {ok, QPid1}
+ end
+ end
+ end).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
new file mode 100644
index 0000000000..4f9d2066be
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -0,0 +1,529 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave).
+
+%% We join the GM group before we add ourselves to the amqqueue
+%% record. As a result:
+%% 1. We can receive msgs from GM that correspond to messages we will
+%% never receive from publishers.
+%% 2. When we receive a message from publishers, we must receive a
+%% message from the GM group for it.
+%% 3. However, that instruction from the GM group can arrive either
+%% before or after the actual message. We need to be able to
+%% distinguish between GM instructions arriving early, and case (1)
+%% above.
+%%
+%% All instructions from the GM group must be processed in the order
+%% in which they're received.
+%%
+%% Thus, we need a queue per sender, and a queue for GM instructions.
+%%
+%% On receipt of a GM group instruction, three things are possible:
+%% 1. The queue of publisher messages is empty. Thus store the GM
+%% instruction to the instrQ.
+%% 2. The head of the queue of publisher messages has a message that
+%% matches the GUID of the GM instruction. Remove the message, and
+%% route appropriately.
+%% 3. The head of the queue of publisher messages has a message that
+%% does not match the GUID of the GM instruction. Throw away the GM
+%% instruction: the GM instruction must correspond to a message
+%% that we'll never receive. If it did not, then before the current
+%% instruction, we would have received an instruction for the
+%% message at the head of this queue, thus the head of the queue
+%% would have been removed and processed.
+%%
+%% On receipt of a publisher message, three things are possible:
+%% 1. The queue of GM group instructions is empty. Add the message to
+%% the relevant queue and await instructions from the GM.
+%% 2. The head of the queue of GM group instructions has an
+%% instruction matching the GUID of the message. Remove that
+%% instruction and act on it. Attempt to process the rest of the
+%% instrQ.
+%% 3. The head of the queue of GM group instructions has an
+%% instruction that does not match the GUID of the message. If the
+%% message is from the same publisher as is referred to by the
+%% instruction then throw away the GM group instruction and repeat
+%% - attempt to match against the next instruction if there is one:
+%% The instruction thrown away was for a message we'll never
+%% receive.
+%%
+%% In all cases, we are relying heavily on order preserving messaging
+%% both from the GM group and from the publishers.
+
+-export([start_link/1, set_maximum_since_use/2]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, handle_pre_hibernate/1]).
+
+-export([joined/2, members_changed/3, handle_msg/3]).
+
+-behaviour(gen_server2).
+-behaviour(gm).
+
+-include("rabbit.hrl").
+-include("gm_specs.hrl").
+
+-record(state, { q,
+ gm,
+ master_node,
+ backing_queue,
+ backing_queue_state,
+ rate_timer_ref,
+
+ sender_queues, %% :: Pid -> MsgQ
+ guid_ack, %% :: Guid -> AckTag
+ seen, %% Set Guid
+
+ guid_to_channel %% for confirms
+ }).
+
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+
+start_link(Q) ->
+ gen_server2:start_link(?MODULE, [Q], []).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+
+init([#amqqueue { name = QueueName } = Q]) ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ ok = gm:create_tables(),
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ Self = self(),
+ Node = node(),
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ mnesia:write(rabbit_queue,
+ Q1 #amqqueue { mirror_pids = MPids1 },
+ write),
+ {ok, QPid};
+ _ ->
+ {error, node_already_present}
+ end
+ end) of
+ {ok, MPid} ->
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
+ ok = rabbit_memory_monitor:register(
+ self(), {rabbit_amqqueue, set_ram_duration_target,
+ [self()]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, false),
+ {ok, #state { q = Q,
+ gm = GM,
+ master_node = node(MPid),
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ guid_ack = dict:new(),
+ seen = sets:new(),
+
+ guid_to_channel = dict:new()
+ }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ {error, Error} ->
+ {stop, Error}
+ end.
+
+handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "immediate" delivery mode
+ gen_server2:reply(From, false), %% master may deliver it, not us
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({deliver, Delivery = #delivery {}}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode
+ gen_server2:reply(From, true), %% amqqueue throws away the result anyway
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_call({gm_deaths, Deaths}, From,
+ State = #state { q = #amqqueue { name = QueueName },
+ gm = GM,
+ master_node = MNode }) ->
+ rabbit_log:info("Slave ~p saw deaths ~p for ~s~n",
+ [self(), Deaths, rabbit_misc:rs(QueueName)]),
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ {ok, Pid} when node(Pid) =:= MNode ->
+ reply(ok, State);
+ {ok, Pid} when node(Pid) =:= node() ->
+ promote_me(From, State);
+ {ok, Pid} ->
+ gen_server2:reply(From, ok),
+ ok = gm:broadcast(GM, heartbeat),
+ noreply(State #state { master_node = node(Pid) });
+ {error, not_found} ->
+ gen_server2:reply(From, ok),
+ {stop, normal, State}
+ end;
+
+handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)).
+
+
+handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State));
+
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
+
+handle_cast({deliver, Delivery = #delivery {}}, State) ->
+ %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ noreply(maybe_enqueue_message(Delivery, State));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State);
+
+handle_cast({set_ram_duration_target, Duration},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_ram_duration_target(Duration, BQS),
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(update_ram_duration,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {RamDuration, BQS1} = BQ:ram_duration(BQS),
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ noreply(State #state { rate_timer_ref = just_measured,
+ backing_queue_state = BQS2 }).
+
+handle_info(Msg, State) ->
+ {stop, {unexpected_info, Msg}, State}.
+
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate(_Reason, #state { backing_queue_state = undefined }) ->
+ %% We've received a delete_and_terminate from gm, thus nothing to
+ %% do here.
+ ok;
+terminate(Reason, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef }) ->
+ ok = gm:leave(GM),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, BQ, BQS, RateTRef, [], []),
+ rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate([_SPid], _Reason) ->
+ %% gm case
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+handle_pre_hibernate(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ %% mainly copied from amqqueue_process
+ BQS1 = BQ:handle_pre_hibernate(BQS),
+ %% no activity for a while == 0 egress and ingress rates
+ DesiredDuration =
+ rabbit_memory_monitor:report_ram_duration(self(), infinity),
+ BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
+ {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS2 })}.
+
+%% ---------------------------------------------------------------------------
+%% GM
+%% ---------------------------------------------------------------------------
+
+joined([SPid], _Members) ->
+ SPid ! {joined, self()},
+ ok.
+
+members_changed([_SPid], _Births, []) ->
+ ok;
+members_changed([SPid], _Births, Deaths) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {stop, normal} end,
+ fun () ->
+ case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of
+ ok ->
+ ok;
+ {promote, CPid} ->
+ {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end
+ end).
+
+handle_msg([_SPid], _From, heartbeat) ->
+ ok;
+handle_msg([SPid], _From, Msg) ->
+ ok = gen_server2:cast(SPid, {gm, Msg}).
+
+%% ---------------------------------------------------------------------------
+%% Others
+%% ---------------------------------------------------------------------------
+
+maybe_run_queue_via_backing_queue(
+ Mod, Fun, State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_to_channel = GTC }) ->
+ {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS),
+ GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids),
+ State #state { backing_queue_state = BQS1,
+ guid_to_channel = GTC1 }.
+
+record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) ->
+ GTC;
+record_confirm_or_confirm(
+ #delivery { sender = ChPid,
+ message = #basic_message { is_persistent = true,
+ guid = Guid },
+ msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) ->
+ dict:store(Guid, {ChPid, MsgSeqNo}, GTC);
+record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo },
+ _Q, GTC) ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ GTC.
+
+maybe_confirm_message(Guid, GTC) ->
+ case dict:find(Guid, GTC) of
+ {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined ->
+ ok = rabbit_channel:confirm(ChPid, MsgSeqNo),
+ dict:erase(Guid, GTC);
+ error ->
+ GTC
+ end.
+
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
+
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_ack = GA }) ->
+ rabbit_log:info("Promoting slave ~p for ~s~n",
+ [self(), rabbit_misc:rs(Q #amqqueue.name)]),
+ {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
+ true = unlink(GM),
+ gen_server2:reply(From, {promote, CPid}),
+ ok = gm:confirmed_broadcast(GM, heartbeat),
+ MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
+ CPid, BQ, BQS, GM, Seen),
+ %% We have to do the requeue via this init because otherwise we
+ %% don't have access to the relevent MsgPropsFun. Also, we are
+ %% already in mnesia as the master queue pid. Thus we cannot just
+ %% publish stuff by sending it to ourself - we must pass it
+ %% through to this init, otherwise we can violate ordering
+ %% constraints.
+ AckTags = [AckTag || {_Guid, AckTag} <- dict:to_list(GA)],
+ Deliveries = lists:append([queue:to_list(PubQ)
+ || {_ChPid, PubQ} <- dict:to_list(SQ)]),
+ QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q, rabbit_mirror_queue_master, MasterState, RateTRef,
+ AckTags, Deliveries),
+ {become, rabbit_amqqueue_process, QueueState, hibernate}.
+
+noreply(State) ->
+ {noreply, next_state(State), hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, next_state(State), hibernate}.
+
+next_state(State) ->
+ ensure_rate_timer(State).
+
+%% copied+pasted from amqqueue_process
+ensure_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(
+ ?RAM_DURATION_UPDATE_INTERVAL,
+ rabbit_amqqueue, update_ram_duration,
+ [self()]),
+ State #state { rate_timer_ref = TRef };
+ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+ensure_rate_timer(State) ->
+ State.
+
+stop_rate_timer(State = #state { rate_timer_ref = undefined }) ->
+ State;
+stop_rate_timer(State = #state { rate_timer_ref = just_measured }) ->
+ State #state { rate_timer_ref = undefined };
+stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #state { rate_timer_ref = undefined }.
+
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { guid = Guid },
+ sender = ChPid },
+ State = #state { q = Q,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ case sets:is_element(Guid, Seen) of
+ true ->
+ GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
+ State #state { guid_to_channel = GTC1,
+ seen = sets:del_element(Guid, Seen) };
+ false ->
+ MQ = case dict:find(ChPid, SQ) of
+ {ok, MQ1} -> MQ1;
+ error -> queue:new()
+ end,
+ SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ State #state { sender_queues = SQ1 }
+ end.
+
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }},
+ State = #state { q = Q,
+ sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ {SQ1, Seen1, GTC1} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {{value, Delivery = #delivery {
+ message = #basic_message { guid = Guid } }},
+ MQ1} ->
+ GTC2 = record_confirm_or_confirm(Delivery, Q, GTC),
+ {dict:store(ChPid, MQ1, SQ), Seen, GTC2};
+ {{value, #delivery {}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% amqqueue record. We'll never receive the
+ %% message directly.
+ {SQ, Seen, GTC}
+ end
+ end,
+ State1 = State #state { sender_queues = SQ1,
+ seen = Seen1,
+ guid_to_channel = GTC1 },
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State1 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {GA1, GTC3} = case AckRequired of
+ true -> {dict:store(Guid, AckTag, GA), GTC1};
+ false -> {GA, maybe_confirm_message(Guid, GTC1)}
+ end,
+ State1 #state { backing_queue_state = BQS1,
+ guid_ack = GA1,
+ guid_to_channel = GTC3 }
+ end};
+process_instruction({set_length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ QLen = BQ:len(BQS),
+ ToDrop = QLen - Length,
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
+process_instruction({fetch, AckRequired, Guid, Remaining},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ QLen = BQ:len(BQS),
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ GA1 = case AckRequired of
+ true -> dict:store(Guid, AckTag, GA);
+ false -> GA
+ end,
+ State #state { backing_queue_state = BQS1,
+ guid_ack = GA1 };
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
+process_instruction({ack, Guids},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ {AckTags, GA1} = guids_to_acktags(Guids, GA),
+ {Guids1, BQS1} = BQ:ack(AckTags, BQS),
+ [] = Guids1 -- Guids, %% ASSERTION
+ {ok, State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, Guids},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA }) ->
+ {AckTags, GA1} = guids_to_acktags(Guids, GA),
+ {ok, case length(AckTags) =:= length(Guids) of
+ true ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% the only thing we can safely do is nuke out our BQ
+ %% and GA
+ {_Count, BQS1} = BQ:purge(BQS),
+ {Guids, BQS2} = ack_all(BQ, GA, BQS1),
+ State #state { guid_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
+process_instruction(delete_and_terminate,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQ:delete_and_terminate(BQS),
+ {stop, State #state { backing_queue_state = undefined }}.
+
+guids_to_acktags(Guids, GA) ->
+ {AckTags, GA1} =
+ lists:foldl(fun (Guid, {AckTagsN, GAN}) ->
+ case dict:find(Guid, GA) of
+ error -> {AckTagsN, GAN};
+ {ok, AckTag} -> {[AckTag | AckTagsN],
+ dict:erase(Guid, GAN)}
+ end
+ end, {[], GA}, Guids),
+ {lists:reverse(AckTags), GA1}.
+
+ack_all(BQ, GA, BQS) ->
+ BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS).
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
new file mode 100644
index 0000000000..80c0520c08
--- /dev/null
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -0,0 +1,54 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_mirror_queue_slave_sup).
+
+-rabbit_boot_step({mirror_queue_slave_sup,
+ [{description, "mirror queue slave sup"},
+ {mfa, {rabbit_mirror_queue_slave_sup, start, []}},
+ {requires, queue_sup_queue_recovery},
+ {enables, routing_ready}]}).
+
+-behaviour(supervisor2).
+
+-export([start/0, start_link/0, start_child/2]).
+
+-export([init/1]).
+
+-include_lib("rabbit.hrl").
+
+-define(SERVER, ?MODULE).
+
+start() ->
+ {ok, _} =
+ supervisor:start_child(
+ rabbit_sup,
+ {rabbit_mirror_queue_slave_sup,
+ {rabbit_mirror_queue_slave_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}),
+ ok.
+
+start_link() ->
+ supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
+
+start_child(Node, Args) ->
+ supervisor2:start_child({?SERVER, Node}, Args).
+
+init([]) ->
+ {ok, {{simple_one_for_one_terminate, 10, 10},
+ [{rabbit_mirror_queue_slave,
+ {rabbit_mirror_queue_slave, start_link, []},
+ temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a9b4e17745..1ad65759d4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -204,7 +204,8 @@ table_definitions() ->
{rabbit_queue,
[{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)},
- {match, #amqqueue{name = queue_name_match(), _='_'}}]}].
+ {match, #amqqueue{name = queue_name_match(), _='_'}}]}]
+ ++ gm:table_definitions().
binding_match() ->
#binding{source = exchange_name_match(),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 692d2473b8..309e0e6ed9 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -102,7 +102,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
case mnesia:dirty_read({rabbit_queue, QName}) of
- [#amqqueue{pid = QPid}] -> [QPid | QPids];
- [] -> QPids
+ [#amqqueue{pid = QPid, mirror_pids = MPids}] ->
+ MPids ++ [QPid | QPids];
+ [] ->
+ QPids
end
end, [], QNames).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 58c369b5a3..e895b0edb2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1913,7 +1913,7 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
true -> 2;
false -> 1
end}, <<>>),
- #message_properties{}, VQN)
+ #message_properties{}, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -1931,9 +1931,13 @@ assert_prop(List, Prop, Value) ->
assert_props(List, PropVals) ->
[assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+test_amqqueue(Durable) ->
+ (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
+ #amqqueue { durable = Durable }.
+
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ VQ = rabbit_variable_queue:init(test_amqqueue(true), false,
fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
@@ -1992,7 +1996,7 @@ test_dropwhile(VQ0) ->
rabbit_basic:message(
rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{}, <<>>),
- #message_properties{expiry = N}, VQN)
+ #message_properties{expiry = N}, self(), VQN)
end, VQ0, lists:seq(1, Count)),
%% drop the first 5 messages
@@ -2036,7 +2040,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2046,7 +2050,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2080,7 +2084,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2109,7 +2113,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
@@ -2123,10 +2127,11 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
- VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
+ {_Guids, VQ4} =
+ rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true,
fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2134,7 +2139,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_queue_recover() ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
TxID = rabbit_guid:guid(),
- {new, #amqqueue { pid = QPid, name = QName }} =
+ {new, #amqqueue { pid = QPid, name = QName } = Q} =
rabbit_amqqueue:declare(test_queue(), true, false, [], none),
[begin
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
@@ -2158,7 +2163,7 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true,
+ VQ1 = rabbit_variable_queue:init(Q, true,
fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f27..bde336c000 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -123,7 +123,8 @@
auto_delete :: boolean(),
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ pid :: rabbit_types:maybe(pid()),
+ mirror_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d56072..74487ade05 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,18 +16,18 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+-export([init/2, terminate/1, delete_and_terminate/1,
+ purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
+ tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, invoke/3]).
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/4]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -393,15 +393,16 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, Recover) ->
+init(Queue, Recover) ->
Self = self(),
- init(QueueName, IsDurable, Recover,
+ init(Queue, Recover,
fun (Guids, ActionTaken) ->
msgs_written_to_disk(Self, Guids, ActionTaken)
end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
-init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName, durable = IsDurable }, false,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
case IsDurable of
@@ -411,7 +412,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
end,
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
-init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+init(#amqqueue { name = QueueName }, true,
+ MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -501,18 +503,19 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, State) ->
+publish(Msg, MsgProps, _ChPid, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, #basic_message { guid = Guid },
- _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, _ChPid,
+ State = #vqstate { len = 0 }) ->
blind_confirm(self(), gb_sets:singleton(Guid)),
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
+ _ChPid,
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
@@ -642,13 +645,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State)).
+ {Guids, State1} = ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State),
+ {Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
- State = #vqstate { durable = IsDurable,
- msg_store_clients = MSCState }) ->
+ _ChPid, State = #vqstate { durable = IsDurable,
+ msg_store_clients = MSCState }) ->
Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
case IsPersistent andalso IsDurable of
@@ -698,7 +702,7 @@ requeue(AckTags, MsgPropsFun, State) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- a(reduce_memory_use(
+ {Guids, State1} =
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
@@ -713,7 +717,8 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ {Guids, a(reduce_memory_use(State1))}.
len(#vqstate { len = Len }) -> Len.
@@ -851,6 +856,9 @@ status(#vqstate {
{avg_ack_ingress_rate, AvgAckIngressRate},
{avg_ack_egress_rate , AvgAckEgressRate} ].
+invoke(?MODULE, Fun, State) ->
+ Fun(State).
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
@@ -962,7 +970,7 @@ msg_store_close_fds_fun(IsPersistent) ->
Self = self(),
fun () ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self,
+ Self, ?MODULE,
fun (State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} =
msg_store_close_fds(MSCState, IsPersistent),
@@ -1108,10 +1116,11 @@ blank_rate(Timestamp, IngressLength) ->
msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> {[], tx_commit_post_msg_store(
- true, Pubs, AckTags,
- Fun, MsgPropsFun, StateN)}
- end)
+ Self, ?MODULE,
+ fun (StateN) -> {[], tx_commit_post_msg_store(
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)}
+ end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> remove_persistent_messages(
@@ -1174,20 +1183,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
Acks = lists:append(SAcks),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
- {SeqIds, State1 = #vqstate { index_state = IndexState }} =
+ {_Guids, State1} = ack(Acks, State),
+ {SeqIds, State2 = #vqstate { index_state = IndexState }} =
lists:foldl(
fun ({Msg = #basic_message { is_persistent = IsPersistent },
MsgProps},
- {SeqIdsAcc, State2}) ->
+ {SeqIdsAcc, State3}) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- {SeqId, State3} =
- publish(Msg, MsgProps, false, IsPersistent1, State2),
- {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ {SeqId, State4} =
+ publish(Msg, MsgProps, false, IsPersistent1, State3),
+ {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4}
+ end, {PAcks, State1}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
- State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
+ State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }).
purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
@@ -1334,7 +1344,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore} =
+ {PersistentSeqIds, GuidsByStore, _AllGuids} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1353,9 +1363,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore, AllGuids},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1375,21 +1385,24 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ {lists:reverse(AllGuids),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new()}.
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
- {PersistentSeqIdsAcc, GuidsByStore};
+ index_on_disk = false,
+ guid = Guid },
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
+ [Guid | AllGuids]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1435,33 +1448,35 @@ msgs_confirmed(GuidSet, State) ->
blind_confirm(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+ QPid, ?MODULE, fun (State) -> msgs_confirmed(GuidSet, State) end).
msgs_written_to_disk(QPid, GuidSet, removed) ->
blind_confirm(QPid, GuidSet);
msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- QPid, fun (State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
- end).
+ QPid, ?MODULE,
+ fun (State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
%%----------------------------------------------------------------------------
%% Phase changes