diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 16 | ||||
| -rw-r--r-- | src/gm.erl | 1308 | ||||
| -rw-r--r-- | src/gm_test.erl | 126 | ||||
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 128 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 136 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 250 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 529 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 139 |
18 files changed, 2701 insertions, 150 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index a637ddddc8..75d7ee8c71 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -880,6 +880,22 @@ handle_common_reply(Reply, Msg, GS2State = #gs2_state { name = Name, loop(GS2State #gs2_state { state = NState, time = Time1, debug = Debug1 }); + {become, Mod, NState} -> + Debug1 = common_debug(Debug, fun print_event/3, Name, + {become, Mod, NState}), + loop(find_prioritisers( + GS2State #gs2_state { mod = Mod, + state = NState, + time = infinity, + debug = Debug1 })); + {become, Mod, NState, Time1} -> + Debug1 = common_debug(Debug, fun print_event/3, Name, + {become, Mod, NState}), + loop(find_prioritisers( + GS2State #gs2_state { mod = Mod, + state = NState, + time = Time1, + debug = Debug1 })); _ -> handle_common_termination(Reply, Msg, GS2State) end. diff --git a/src/gm.erl b/src/gm.erl new file mode 100644 index 0000000000..baf46471c6 --- /dev/null +++ b/src/gm.erl @@ -0,0 +1,1308 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(gm). + +%% Guaranteed Multicast +%% ==================== +%% +%% This module provides the ability to create named groups of +%% processes to which members can be dynamically added and removed, +%% and for messages to be broadcast within the group that are +%% guaranteed to reach all members of the group during the lifetime of +%% the message. The lifetime of a message is defined as being, at a +%% minimum, the time from which the message is first sent to any +%% member of the group, up until the time at which it is known by the +%% member who published the message that the message has reached all +%% group members. +%% +%% The guarantee given is that provided a message, once sent, makes it +%% to members who do not all leave the group, the message will +%% continue to propagate to all group members. +%% +%% Another way of stating the guarantee is that if member P publishes +%% messages m and m', then for all members P', if P' is a member of +%% the group prior to the publication of m, and P' receives m', then +%% P' will receive m. +%% +%% Note that only local-ordering is enforced: i.e. if member P sends +%% message m and then message m', then for-all members P', if P' +%% receives m and m', then they will receive m' after m. Causality +%% ordering is _not_ enforced. I.e. if member P receives message m +%% and as a result publishes message m', there is no guarantee that +%% other members P' will receive m before m'. +%% +%% +%% API Use +%% ------- +%% +%% Mnesia must be started. Use the idempotent create_tables/0 function +%% to create the tables required. +%% +%% start_link/3 +%% Provide the group name, the callback module name, and a list of any +%% arguments you wish to be passed into the callback module's +%% functions. The joined/1 will be called when we have joined the +%% group, and the list of arguments will have appended to it a list of +%% the current members of the group. See the comments in +%% behaviour_info/1 below for further details of the callback +%% functions. +%% +%% leave/1 +%% Provide the Pid. Removes the Pid from the group. The callback +%% terminate/1 function will be called. +%% +%% broadcast/2 +%% Provide the Pid and a Message. The message will be sent to all +%% members of the group as per the guarantees given above. This is a +%% cast and the function call will return immediately. There is no +%% guarantee that the message will reach any member of the group. +%% +%% confirmed_broadcast/2 +%% Provide the Pid and a Message. As per broadcast/2 except that this +%% is a call, not a cast, and only returns 'ok' once the Message has +%% reached every member of the group. Do not call +%% confirmed_broadcast/2 directly from the callback module otherwise +%% you will deadlock the entire group. +%% +%% group_members/1 +%% Provide the Pid. Returns a list of the current group members. +%% +%% +%% Implementation Overview +%% ----------------------- +%% +%% One possible means of implementation would be a fan-out from the +%% sender to every member of the group. This would require that the +%% group is fully connected, and, in the event that the original +%% sender of the message disappears from the group before the message +%% has made it to every member of the group, raises questions as to +%% who is responsible for sending on the message to new group members. +%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] - +%% if the sender dies part way through, who is responsible for +%% ensuring that the remaining Members receive the Msg? In the event +%% that within the group, messages sent are broadcast from a subset of +%% the members, the fan-out arrangement has the potential to +%% substantially impact the CPU and network workload of such members, +%% as such members would have to accommodate the cost of sending each +%% message to every group member. +%% +%% Instead, if the members of the group are arranged in a chain, then +%% it becomes easier to reason about who within the group has received +%% each message and who has not. It eases issues of responsibility: in +%% the event of a group member disappearing, the nearest upstream +%% member of the chain is responsible for ensuring that messages +%% continue to propagate down the chain. It also results in equal +%% distribution of sending and receiving workload, even if all +%% messages are being sent from just a single group member. This +%% configuration has the further advantage that it is not necessary +%% for every group member to know of every other group member, and +%% even that a group member does not have to be accessible from all +%% other group members. +%% +%% Performance is kept high by permitting pipelining and all +%% communication between joined group members is asynchronous. In the +%% chain A -> B -> C -> D, if A sends a message to the group, it will +%% not directly contact C or D. However, it must know that D receives +%% the message (in addition to B and C) before it can consider the +%% message fully sent. A simplistic implementation would require that +%% D replies to C, C replies to B and B then replies to A. This would +%% result in a propagation delay of twice the length of the chain. It +%% would also require, in the event of the failure of C, that D knows +%% to directly contact B and issue the necessary replies. Instead, the +%% chain forms a ring: D sends the message on to A: D does not +%% distinguish A as the sender, merely as the next member (downstream) +%% within the chain (which has now become a ring). When A receives +%% from D messages that A sent, it knows that all members have +%% received the message. However, the message is not dead yet: if C +%% died as B was sending to C, then B would need to detect the death +%% of C and forward the message on to D instead: thus every node has +%% to remember every message published until it is told that it can +%% forget about the message. This is essential not just for dealing +%% with failure of members, but also for the addition of new members. +%% +%% Thus once A receives the message back again, it then sends to B an +%% acknowledgement for the message, indicating that B can now forget +%% about the message. B does so, and forwards the ack to C. C forgets +%% the message, and forwards the ack to D, which forgets the message +%% and finally forwards the ack back to A. At this point, A takes no +%% further action: the message and its acknowledgement have made it to +%% every member of the group. The message is now dead, and any new +%% member joining the group at this point will not receive the +%% message. +%% +%% We therefore have two roles: +%% +%% 1. The sender, who upon receiving their own messages back, must +%% then send out acknowledgements, and upon receiving their own +%% acknowledgements back perform no further action. +%% +%% 2. The other group members who upon receiving messages and +%% acknowledgements must update their own internal state accordingly +%% (the sending member must also do this in order to be able to +%% accommodate failures), and forwards messages on to their downstream +%% neighbours. +%% +%% +%% Implementation: It gets trickier +%% -------------------------------- +%% +%% Chain A -> B -> C -> D +%% +%% A publishes a message which B receives. A now dies. B and D will +%% detect the death of A, and will link up, thus the chain is now B -> +%% C -> D. B forwards A's message on to C, who forwards it to D, who +%% forwards it to B. Thus B is now responsible for A's messages - both +%% publications and acknowledgements that were in flight at the point +%% at which A died. Even worse is that this is transitive: after B +%% forwards A's message to C, B dies as well. Now C is not only +%% responsible for B's in-flight messages, but is also responsible for +%% A's in-flight messages. +%% +%% Lemma 1: A member can only determine which dead members they have +%% inherited responsibility for if there is a total ordering on the +%% conflicting additions and subtractions of members from the group. +%% +%% Consider the simultaneous death of B and addition of B' that +%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or +%% C is responsible for in-flight messages from B. It is easy to +%% ensure that at least one of them thinks they have inherited B, but +%% if we do not ensure that exactly one of them inherits B, then we +%% could have B' converting publishes to acks, which then will crash C +%% as C does not believe it has issued acks for those messages. +%% +%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E +%% becoming A -> C' -> E. Who has inherited which of B, C and D? +%% +%% However, for non-conflicting membership changes, only a partial +%% ordering is required. For example, A -> B -> C becoming A -> A' -> +%% B. The addition of A', between A and B can have no conflicts with +%% the death of C: it is clear that A has inherited C's messages. +%% +%% For ease of implementation, we adopt the simple solution, of +%% imposing a total order on all membership changes. +%% +%% On the death of a member, it is ensured the dead member's +%% neighbours become aware of the death, and the upstream neighbour +%% now sends to its new downstream neighbour its state, including the +%% messages pending acknowledgement. The downstream neighbour can then +%% use this to calculate which publishes and acknowledgements it has +%% missed out on, due to the death of its old upstream. Thus the +%% downstream can catch up, and continues the propagation of messages +%% through the group. +%% +%% Lemma 2: When a member is joining, it must synchronously +%% communicate with its upstream member in order to receive its +%% starting state atomically with its addition to the group. +%% +%% New members must start with the same state as their nearest +%% upstream neighbour. This ensures that it is not surprised by +%% acknowledgements they are sent, and that should their downstream +%% neighbour die, they are able to send the correct state to their new +%% downstream neighbour to ensure it can catch up. Thus in the +%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' -> +%% C, A' must start with the state of A, so that it can send C the +%% correct state when B dies, allowing C to detect any missed +%% messages. +%% +%% If A' starts by adding itself to the group membership, A could then +%% die, without A' having received the necessary state from A. This +%% would leave A' responsible for in-flight messages from A, but +%% having the least knowledge of all, of those messages. Thus A' must +%% start by synchronously calling A, which then immediately sends A' +%% back its state. A then adds A' to the group. If A dies at this +%% point then A' will be able to see this (as A' will fail to appear +%% in the group membership), and thus A' will ignore the state it +%% receives from A, and will simply repeat the process, trying to now +%% join downstream from some other member. This ensures that should +%% the upstream die as soon as the new member has been joined, the new +%% member is guaranteed to receive the correct state, allowing it to +%% correctly process messages inherited due to the death of its +%% upstream neighbour. +%% +%% The canonical definition of the group membership is held by a +%% distributed database. Whilst this allows the total ordering of +%% changes to be achieved, it is nevertheless undesirable to have to +%% query this database for the current view, upon receiving each +%% message. Instead, we wish for members to be able to cache a view of +%% the group membership, which then requires a cache invalidation +%% mechanism. Each member maintains its own view of the group +%% membership. Thus when the group's membership changes, members may +%% need to become aware of such changes in order to be able to +%% accurately process messages they receive. Because of the +%% requirement of a total ordering of conflicting membership changes, +%% it is not possible to use the guaranteed broadcast mechanism to +%% communicate these changes: to achieve the necessary ordering, it +%% would be necessary for such messages to be published by exactly one +%% member, which can not be guaranteed given that such a member could +%% die. +%% +%% The total ordering we enforce on membership changes gives rise to a +%% view version number: every change to the membership creates a +%% different view, and the total ordering permits a simple +%% monotonically increasing view version number. +%% +%% Lemma 3: If a message is sent from a member that holds view version +%% N, it can be correctly processed by any member receiving the +%% message with a view version >= N. +%% +%% Initially, let us suppose that each view contains the ordering of +%% every member that was ever part of the group. Dead members are +%% marked as such. Thus we have a ring of members, some of which are +%% dead, and are thus inherited by the nearest alive downstream +%% member. +%% +%% In the chain A -> B -> C, all three members initially have view +%% version 1, which reflects reality. B publishes a message, which is +%% forward by C to A. B now dies, which A notices very quickly. Thus A +%% updates the view, creating version 2. It now forwards B's +%% publication, sending that message to its new downstream neighbour, +%% C. This happens before C is aware of the death of B. C must become +%% aware of the view change before it interprets the message its +%% received, otherwise it will fail to learn of the death of B, and +%% thus will not realise it has inherited B's messages (and will +%% likely crash). +%% +%% Thus very simply, we have that each subsequent view contains more +%% information than the preceding view. +%% +%% However, to avoid the views growing indefinitely, we need to be +%% able to delete members which have died _and_ for which no messages +%% are in-flight. This requires that upon inheriting a dead member, we +%% know the last publication sent by the dead member (this is easy: we +%% inherit a member because we are the nearest downstream member which +%% implies that we know at least as much than everyone else about the +%% publications of the dead member), and we know the earliest message +%% for which the acknowledgement is still in flight. +%% +%% In the chain A -> B -> C, when B dies, A will send to C its state +%% (as C is the new downstream from A), allowing C to calculate which +%% messages it has missed out on (described above). At this point, C +%% also inherits B's messages. If that state from A also includes the +%% last message published by B for which an acknowledgement has been +%% seen, then C knows exactly which further acknowledgements it must +%% receive (also including issuing acknowledgements for publications +%% still in-flight that it receives), after which it is known there +%% are no more messages in flight for B, thus all evidence that B was +%% ever part of the group can be safely removed from the canonical +%% group membership. +%% +%% Thus, for every message that a member sends, it includes with that +%% message its view version. When a member receives a message it will +%% update its view from the canonical copy, should its view be older +%% than the view version included in the message it has received. +%% +%% The state held by each member therefore includes the messages from +%% each publisher pending acknowledgement, the last publication seen +%% from that publisher, and the last acknowledgement from that +%% publisher. In the case of the member's own publications or +%% inherited members, this last acknowledgement seen state indicates +%% the last acknowledgement retired, rather than sent. +%% +%% +%% Proof sketch +%% ------------ +%% +%% We need to prove that with the provided operational semantics, we +%% can never reach a state that is not well formed from a well-formed +%% starting state. +%% +%% Operational semantics (small step): straight-forward message +%% sending, process monitoring, state updates. +%% +%% Well formed state: dead members inherited by exactly one non-dead +%% member; for every entry in anyone's pending-acks, either (the +%% publication of the message is in-flight downstream from the member +%% and upstream from the publisher) or (the acknowledgement of the +%% message is in-flight downstream from the publisher and upstream +%% from the member). +%% +%% Proof by induction on the applicable operational semantics. +%% +%% +%% Related work +%% ------------ +%% +%% The ring configuration and double traversal of messages around the +%% ring is similar (though developed independently) to the LCR +%% protocol by [Levy 2008]. However, LCR differs in several +%% ways. Firstly, by using vector clocks, it enforces a total order of +%% message delivery, which is unnecessary for our purposes. More +%% significantly, it is built on top of a "group communication system" +%% which performs the group management functions, taking +%% responsibility away from the protocol as to how to cope with safely +%% adding and removing members. When membership changes do occur, the +%% protocol stipulates that every member must perform communication +%% with every other member of the group, to ensure all outstanding +%% deliveries complete, before the entire group transitions to the new +%% view. This, in total, requires two sets of all-to-all synchronous +%% communications. +%% +%% This is not only rather inefficient, but also does not explain what +%% happens upon the failure of a member during this process. It does +%% though entirely avoid the need for inheritance of responsibility of +%% dead members that our protocol incorporates. +%% +%% In [Marandi et al 2010], a Paxos-based protocol is described. This +%% work explicitly focuses on the efficiency of communication. LCR +%% (and our protocol too) are more efficient, but at the cost of +%% higher latency. The Ring-Paxos protocol is itself built on top of +%% IP-multicast, which rules it out for many applications where +%% point-to-point communication is all that can be required. They also +%% have an excellent related work section which I really ought to +%% read... +%% +%% +%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008. +%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast +%% Protocol + + +-behaviour(gen_server2). + +-export([create_tables/0, start_link/3, leave/1, broadcast/2, + confirmed_broadcast/2, group_members/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, prioritise_info/2]). + +-export([behaviour_info/1]). + +-export([table_definitions/0]). + +-define(GROUP_TABLE, gm_group). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). +-define(SETS, ordsets). +-define(DICT, orddict). + +-record(state, + { self, + left, + right, + group_name, + module, + view, + pub_count, + members_state, + callback_args, + confirms + }). + +-record(gm_group, { name, version, members }). + +-record(view_member, { id, aliases, left, right }). + +-record(member, { pending_ack, last_pub, last_ack }). + +-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group}, + {attributes, record_info(fields, gm_group)}]}). +-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}). + +-define(TAG, '$gm'). + +-ifdef(use_specs). + +-export_type([group_name/0]). + +-type(group_name() :: any()). + +-spec(create_tables/0 :: () -> 'ok'). +-spec(start_link/3 :: (group_name(), atom(), [any()]) -> + {'ok', pid()} | {'error', any()}). +-spec(leave/1 :: (pid()) -> 'ok'). +-spec(broadcast/2 :: (pid(), any()) -> 'ok'). +-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). +-spec(group_members/1 :: (pid()) -> [pid()]). + +-endif. + +behaviour_info(callbacks) -> + [ + %% Called when we've successfully joined the group. Supplied with + %% Args provided in start_link, plus current group members. + {joined, 2}, + + %% Supplied with Args provided in start_link, the list of new + %% members and the list of members previously known to us that + %% have since died. Note that if a member joins and dies very + %% quickly, it's possible that we will never see that member + %% appear in either births or deaths. However we are guaranteed + %% that (1) we will see a member joining either in the births + %% here, or in the members passed to joined/1 before receiving + %% any messages from it; and (2) we will not see members die that + %% we have not seen born (or supplied in the members to + %% joined/1). + {members_changed, 3}, + + %% Supplied with Args provided in start_link, the sender, and the + %% message. This does get called for messages injected by this + %% member, however, in such cases, there is no special + %% significance of this call: it does not indicate that the + %% message has made it to any other members, let alone all other + %% members. + {handle_msg, 3}, + + %% Called on gm member termination as per rules in gen_server, + %% with the Args provided in start_link plus the termination + %% Reason. + {terminate, 2} + ]; +behaviour_info(_Other) -> + undefined. + +create_tables() -> + create_tables([?TABLE]). + +create_tables([]) -> + ok; +create_tables([{Table, Attributes} | Tables]) -> + case mnesia:create_table(Table, Attributes) of + {atomic, ok} -> create_tables(Tables); + {aborted, {already_exists, gm_group}} -> create_tables(Tables); + Err -> Err + end. + +table_definitions() -> + {Name, Attributes} = ?TABLE, + [{Name, [?TABLE_MATCH | Attributes]}]. + +start_link(GroupName, Module, Args) -> + gen_server2:start_link(?MODULE, [GroupName, Module, Args], []). + +leave(Server) -> + gen_server2:cast(Server, leave). + +broadcast(Server, Msg) -> + gen_server2:cast(Server, {broadcast, Msg}). + +confirmed_broadcast(Server, Msg) -> + gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). + +group_members(Server) -> + gen_server2:call(Server, group_members, infinity). + + +init([GroupName, Module, Args]) -> + random:seed(now()), + gen_server2:cast(self(), join), + Self = self(), + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = 0, + members_state = undefined, + callback_args = Args, + confirms = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + + +handle_call({confirmed_broadcast, _Msg}, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + +handle_call({confirmed_broadcast, Msg}, _From, + State = #state { self = Self, + right = {Self, undefined}, + module = Module, + callback_args = Args }) -> + handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + +handle_call({confirmed_broadcast, Msg}, From, State) -> + internal_broadcast(Msg, From, State); + +handle_call(group_members, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + +handle_call(group_members, _From, State = #state { view = View }) -> + reply(alive_view_members(View), State); + +handle_call({add_on_right, _NewMember}, _From, + State = #state { members_state = undefined }) -> + reply(not_ready, State); + +handle_call({add_on_right, NewMember}, _From, + State = #state { self = Self, + group_name = GroupName, + view = View, + members_state = MembersState, + module = Module, + callback_args = Args }) -> + Group = record_new_member_in_group( + GroupName, Self, NewMember, + fun (Group1) -> + View1 = group_to_view(Group1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state( + MembersState)}) + end), + View2 = group_to_view(Group), + State1 = check_neighbours(State #state { view = View2 }), + Result = callback_view_changed(Args, Module, View, View2), + handle_callback_result({Result, {ok, Group}, State1}). + + +handle_cast({?TAG, ReqVer, Msg}, + State = #state { view = View, + group_name = GroupName, + module = Module, + callback_args = Args }) -> + {Result, State1} = + case needs_view_update(ReqVer, View) of + true -> + View1 = group_to_view(read_group(GroupName)), + {callback_view_changed(Args, Module, View, View1), + check_neighbours(State #state { view = View1 })}; + false -> + {ok, State} + end, + handle_callback_result( + if_callback_success( + Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); + +handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> + noreply(State); + +handle_cast({broadcast, Msg}, + State = #state { self = Self, + right = {Self, undefined}, + module = Module, + callback_args = Args }) -> + handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + +handle_cast({broadcast, Msg}, State) -> + internal_broadcast(Msg, none, State); + +handle_cast(join, State = #state { self = Self, + group_name = GroupName, + members_state = undefined, + module = Module, + callback_args = Args }) -> + View = join_group(Self, GroupName), + MembersState = + case alive_view_members(View) of + [Self] -> blank_member_state(); + _ -> undefined + end, + State1 = check_neighbours(State #state { view = View, + members_state = MembersState }), + handle_callback_result( + {Module:joined(Args, all_known_members(View)), State1}); + +handle_cast(leave, State) -> + {stop, normal, State}. + + +handle_info({'DOWN', MRef, process, _Pid, _Reason}, + State = #state { self = Self, + left = Left, + right = Right, + group_name = GroupName, + view = View, + module = Module, + callback_args = Args, + confirms = Confirms }) -> + Member = case {Left, Right} of + {{Member1, MRef}, _} -> Member1; + {_, {Member1, MRef}} -> Member1; + _ -> undefined + end, + case Member of + undefined -> + noreply(State); + _ -> + View1 = + group_to_view(record_dead_member_in_group(Member, GroupName)), + State1 = State #state { view = View1 }, + {Result, State2} = + case alive_view_members(View1) of + [Self] -> + maybe_erase_aliases( + State1 #state { + members_state = blank_member_state(), + confirms = purge_confirms(Confirms) }); + _ -> + %% here we won't be pointing out any deaths: + %% the concern is that there maybe births + %% which we'd otherwise miss. + {callback_view_changed(Args, Module, View, View1), + State1} + end, + handle_callback_result({Result, check_neighbours(State2)}) + end. + + +terminate(Reason, #state { module = Module, + callback_args = Args }) -> + Module:terminate(Args, Reason). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; +prioritise_info(_ , _State) -> 0. + + +handle_msg(check_neighbours, State) -> + %% no-op - it's already been done by the calling handle_cast + {ok, State}; + +handle_msg({catchup, Left, MembersStateLeft}, + State = #state { self = Self, + left = {Left, _MRefL}, + right = {Right, _MRefR}, + view = View, + members_state = undefined }) -> + ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), + MembersStateLeft1 = build_members_state(MembersStateLeft), + {ok, State #state { members_state = MembersStateLeft1 }}; + +handle_msg({catchup, Left, MembersStateLeft}, + State = #state { self = Self, + left = {Left, _MRefL}, + view = View, + members_state = MembersState }) + when MembersState =/= undefined -> + MembersStateLeft1 = build_members_state(MembersStateLeft), + AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ + ?DICT:fetch_keys(MembersStateLeft1)), + {MembersState1, Activity} = + lists:foldl( + fun (Id, MembersStateActivity) -> + #member { pending_ack = PALeft, last_ack = LA } = + find_member_or_blank(Id, MembersStateLeft1), + with_member_acc( + fun (#member { pending_ack = PA } = Member, Activity1) -> + case is_member_alias(Id, Self, View) of + true -> + {_AcksInFlight, Pubs, _PA1} = + find_prefix_common_suffix(PALeft, PA), + {Member #member { last_ack = LA }, + activity_cons(Id, pubs_from_queue(Pubs), + [], Activity1)}; + false -> + {Acks, _Common, Pubs} = + find_prefix_common_suffix(PA, PALeft), + {Member, + activity_cons(Id, pubs_from_queue(Pubs), + acks_from_queue(Acks), + Activity1)} + end + end, Id, MembersStateActivity) + end, {MembersState, activity_nil()}, AllMembers), + handle_msg({activity, Left, activity_finalise(Activity)}, + State #state { members_state = MembersState1 }); + +handle_msg({catchup, _NotLeft, _MembersState}, State) -> + {ok, State}; + +handle_msg({activity, Left, Activity}, + State = #state { self = Self, + left = {Left, _MRefL}, + view = View, + members_state = MembersState, + confirms = Confirms }) + when MembersState =/= undefined -> + {MembersState1, {Confirms1, Activity1}} = + lists:foldl( + fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> + with_member_acc( + fun (Member = #member { pending_ack = PA, + last_pub = LP, + last_ack = LA }, + {Confirms2, Activity2}) -> + case is_member_alias(Id, Self, View) of + true -> + {ToAck, PA1} = + find_common(queue_from_pubs(Pubs), PA, + queue:new()), + LA1 = last_ack(Acks, LA), + AckNums = acks_from_queue(ToAck), + Confirms3 = maybe_confirm( + Self, Id, Confirms2, AckNums), + {Member #member { pending_ack = PA1, + last_ack = LA1 }, + {Confirms3, + activity_cons( + Id, [], AckNums, Activity2)}}; + false -> + PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), + LA1 = last_ack(Acks, LA), + LP1 = last_pub(Pubs, LP), + {Member #member { pending_ack = PA1, + last_pub = LP1, + last_ack = LA1 }, + {Confirms2, + activity_cons(Id, Pubs, Acks, Activity2)}} + end + end, Id, MembersStateConfirmsActivity) + end, {MembersState, {Confirms, activity_nil()}}, Activity), + State1 = State #state { members_state = MembersState1, + confirms = Confirms1 }, + Activity3 = activity_finalise(Activity1), + {Result, State2} = maybe_erase_aliases(State1), + ok = maybe_send_activity(Activity3, State2), + if_callback_success( + Result, fun activity_true/3, fun activity_false/3, Activity3, State2); + +handle_msg({activity, _NotLeft, _Activity}, State) -> + {ok, State}. + + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. + +internal_broadcast(Msg, From, State = #state { self = Self, + pub_count = PubCount, + members_state = MembersState, + module = Module, + confirms = Confirms, + callback_args = Args }) -> + PubMsg = {PubCount, Msg}, + Activity = activity_cons(Self, [PubMsg], [], activity_nil()), + ok = maybe_send_activity(activity_finalise(Activity), State), + MembersState1 = + with_member( + fun (Member = #member { pending_ack = PA }) -> + Member #member { pending_ack = queue:in(PubMsg, PA) } + end, Self, MembersState), + Confirms1 = case From of + none -> Confirms; + _ -> queue:in({PubCount, From}, Confirms) + end, + handle_callback_result({Module:handle_msg(Args, Self, Msg), + State #state { pub_count = PubCount + 1, + members_state = MembersState1, + confirms = Confirms1 }}). + + +%% --------------------------------------------------------------------------- +%% View construction and inspection +%% --------------------------------------------------------------------------- + +needs_view_update(ReqVer, {Ver, _View}) -> + Ver < ReqVer. + +view_version({Ver, _View}) -> + Ver. + +is_member_alive({dead, _Member}) -> false; +is_member_alive(_) -> true. + +is_member_alias(Self, Self, _View) -> + true; +is_member_alias(Member, Self, View) -> + ?SETS:is_element(Member, + ((fetch_view_member(Self, View)) #view_member.aliases)). + +dead_member_id({dead, Member}) -> Member. + +store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> + {Ver, ?DICT:store(Id, VMember, View)}. + +with_view_member(Fun, View, Id) -> + store_view_member(Fun(fetch_view_member(Id, View)), View). + +fetch_view_member(Id, {_Ver, View}) -> + ?DICT:fetch(Id, View). + +find_view_member(Id, {_Ver, View}) -> + ?DICT:find(Id, View). + +blank_view(Ver) -> + {Ver, ?DICT:new()}. + +alive_view_members({_Ver, View}) -> + ?DICT:fetch_keys(View). + +all_known_members({_Ver, View}) -> + ?DICT:fold( + fun (Member, #view_member { aliases = Aliases }, Acc) -> + ?SETS:to_list(Aliases) ++ [Member | Acc] + end, [], View). + +group_to_view(#gm_group { members = Members, version = Ver }) -> + Alive = lists:filter(fun is_member_alive/1, Members), + [_|_] = Alive, %% ASSERTION - can't have all dead members + add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members). + +link_view([Left, Middle, Right | Rest], View) -> + case find_view_member(Middle, View) of + error -> + link_view( + [Middle, Right | Rest], + store_view_member(#view_member { id = Middle, + aliases = ?SETS:new(), + left = Left, + right = Right }, View)); + {ok, _} -> + View + end; +link_view(_, View) -> + View. + +add_aliases(View, Members) -> + Members1 = ensure_alive_suffix(Members), + {EmptyDeadSet, View1} = + lists:foldl( + fun (Member, {DeadAcc, ViewAcc}) -> + case is_member_alive(Member) of + true -> + {?SETS:new(), + with_view_member( + fun (VMember = + #view_member { aliases = Aliases }) -> + VMember #view_member { + aliases = ?SETS:union(Aliases, DeadAcc) } + end, ViewAcc, Member)}; + false -> + {?SETS:add_element(dead_member_id(Member), DeadAcc), + ViewAcc} + end + end, {?SETS:new(), View}, Members1), + 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION + View1. + +ensure_alive_suffix(Members) -> + queue:to_list(ensure_alive_suffix1(queue:from_list(Members))). + +ensure_alive_suffix1(MembersQ) -> + {{value, Member}, MembersQ1} = queue:out_r(MembersQ), + case is_member_alive(Member) of + true -> MembersQ; + false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1)) + end. + + +%% --------------------------------------------------------------------------- +%% View modification +%% --------------------------------------------------------------------------- + +join_group(Self, GroupName) -> + join_group(Self, GroupName, read_group(GroupName)). + +join_group(Self, GroupName, {error, not_found}) -> + join_group(Self, GroupName, prune_or_create_group(Self, GroupName)); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) -> + group_to_view(Group); +join_group(Self, GroupName, #gm_group { members = Members } = Group) -> + case lists:member(Self, Members) of + true -> + group_to_view(Group); + false -> + case lists:filter(fun is_member_alive/1, Members) of + [] -> + join_group(Self, GroupName, + prune_or_create_group(Self, GroupName)); + Alive -> + Left = lists:nth(random:uniform(length(Alive)), Alive), + try + case gen_server2:call( + Left, {add_on_right, Self}, infinity) of + {ok, Group1} -> group_to_view(Group1); + not_ready -> join_group(Self, GroupName) + end + catch + exit:{R, _} + when R =:= noproc; R =:= normal; R =:= shutdown -> + join_group( + Self, GroupName, + record_dead_member_in_group(Left, GroupName)) + end + end + end. + +read_group(GroupName) -> + case mnesia:dirty_read(?GROUP_TABLE, GroupName) of + [] -> {error, not_found}; + [Group] -> Group + end. + +prune_or_create_group(Self, GroupName) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> GroupNew = #gm_group { name = GroupName, + members = [Self], + version = 0 }, + case mnesia:read(?GROUP_TABLE, GroupName) of + [] -> + mnesia:write(GroupNew), + GroupNew; + [Group1 = #gm_group { members = Members }] -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group1; + false -> mnesia:write(GroupNew), + GroupNew + end + end + end), + Group. + +record_dead_member_in_group(Member, GroupName) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = + mnesia:read(?GROUP_TABLE, GroupName), + case lists:splitwith( + fun (Member1) -> Member1 =/= Member end, Members) of + {_Members1, []} -> %% not found - already recorded dead + Group1; + {Members1, [Member | Members2]} -> + Members3 = Members1 ++ [{dead, Member} | Members2], + Group2 = Group1 #gm_group { members = Members3, + version = Ver + 1 }, + mnesia:write(Group2), + Group2 + end + end), + Group. + +record_new_member_in_group(GroupName, Left, NewMember, Fun) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> + [#gm_group { members = Members, version = Ver } = Group1] = + mnesia:read(?GROUP_TABLE, GroupName), + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + Members1 = Prefix ++ [Left, NewMember | Suffix], + Group2 = Group1 #gm_group { members = Members1, + version = Ver + 1 }, + ok = Fun(Group2), + mnesia:write(Group2), + Group2 + end), + Group. + +erase_members_in_group(Members, GroupName) -> + DeadMembers = [{dead, Id} || Id <- Members], + {atomic, Group} = + mnesia:sync_transaction( + fun () -> + [Group1 = #gm_group { members = [_|_] = Members1, + version = Ver }] = + mnesia:read(?GROUP_TABLE, GroupName), + case Members1 -- DeadMembers of + Members1 -> Group1; + Members2 -> Group2 = + Group1 #gm_group { members = Members2, + version = Ver + 1 }, + mnesia:write(Group2), + Group2 + end + end), + Group. + +maybe_erase_aliases(State = #state { self = Self, + group_name = GroupName, + view = View, + members_state = MembersState, + module = Module, + callback_args = Args }) -> + #view_member { aliases = Aliases } = fetch_view_member(Self, View), + {Erasable, MembersState1} + = ?SETS:fold( + fun (Id, {ErasableAcc, MembersStateAcc} = Acc) -> + #member { last_pub = LP, last_ack = LA } = + find_member_or_blank(Id, MembersState), + case can_erase_view_member(Self, Id, LA, LP) of + true -> {[Id | ErasableAcc], + erase_member(Id, MembersStateAcc)}; + false -> Acc + end + end, {[], MembersState}, Aliases), + State1 = State #state { members_state = MembersState1 }, + case Erasable of + [] -> {ok, State1}; + _ -> View1 = group_to_view( + erase_members_in_group(Erasable, GroupName)), + {callback_view_changed(Args, Module, View, View1), + State1 #state { view = View1 }} + end. + +can_erase_view_member(Self, Self, _LA, _LP) -> false; +can_erase_view_member(_Self, _Id, N, N) -> true; +can_erase_view_member(_Self, _Id, _LA, _LP) -> false. + + +%% --------------------------------------------------------------------------- +%% View monitoring and maintanence +%% --------------------------------------------------------------------------- + +ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> + {Self, undefined}; +ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> + ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; +ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> + {RealNeighbour, MRef}; +ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> + true = erlang:demonitor(MRef), + Msg = {?TAG, Ver, check_neighbours}, + ok = gen_server2:cast(RealNeighbour, Msg), + ok = case Neighbour of + Self -> ok; + _ -> gen_server2:cast(Neighbour, Msg) + end, + {Neighbour, maybe_monitor(Neighbour, Self)}. + +maybe_monitor(Self, Self) -> + undefined; +maybe_monitor(Other, _Self) -> + erlang:monitor(process, Other). + +check_neighbours(State = #state { self = Self, + left = Left, + right = Right, + view = View }) -> + #view_member { left = VLeft, right = VRight } + = fetch_view_member(Self, View), + Ver = view_version(View), + Left1 = ensure_neighbour(Ver, Self, Left, VLeft), + Right1 = ensure_neighbour(Ver, Self, Right, VRight), + State1 = State #state { left = Left1, right = Right1 }, + ok = maybe_send_catchup(Right, State1), + State1. + +maybe_send_catchup(Right, #state { right = Right }) -> + ok; +maybe_send_catchup(_Right, #state { self = Self, + right = {Self, undefined} }) -> + ok; +maybe_send_catchup(_Right, #state { members_state = undefined }) -> + ok; +maybe_send_catchup(_Right, #state { self = Self, + right = {Right, _MRef}, + view = View, + members_state = MembersState }) -> + send_right(Right, View, + {catchup, Self, prepare_members_state(MembersState)}). + + +%% --------------------------------------------------------------------------- +%% Catch_up delta detection +%% --------------------------------------------------------------------------- + +find_prefix_common_suffix(A, B) -> + {Prefix, A1} = find_prefix(A, B, queue:new()), + {Common, Suffix} = find_common(A1, B, queue:new()), + {Prefix, Common, Suffix}. + +%% Returns the elements of A that occur before the first element of B, +%% plus the remainder of A. +find_prefix(A, B, Prefix) -> + case {queue:out(A), queue:out(B)} of + {{{value, Val}, _A1}, {{value, Val}, _B1}} -> + {Prefix, A}; + {{empty, A1}, {{value, _A}, _B1}} -> + {Prefix, A1}; + {{{value, {NumA, _MsgA} = Val}, A1}, + {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB -> + find_prefix(A1, B, queue:in(Val, Prefix)); + {_, {empty, _B1}} -> + {A, Prefix} %% Prefix well be empty here + end. + +%% A should be a prefix of B. Returns the commonality plus the +%% remainder of B. +find_common(A, B, Common) -> + case {queue:out(A), queue:out(B)} of + {{{value, Val}, A1}, {{value, Val}, B1}} -> + find_common(A1, B1, queue:in(Val, Common)); + {{empty, _A}, _} -> + {Common, B} + end. + + +%% --------------------------------------------------------------------------- +%% Members helpers +%% --------------------------------------------------------------------------- + +with_member(Fun, Id, MembersState) -> + store_member( + Id, Fun(find_member_or_blank(Id, MembersState)), MembersState). + +with_member_acc(Fun, Id, {MembersState, Acc}) -> + {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc), + {store_member(Id, MemberState, MembersState), Acc1}. + +find_member_or_blank(Id, MembersState) -> + case ?DICT:find(Id, MembersState) of + {ok, Result} -> Result; + error -> blank_member() + end. + +erase_member(Id, MembersState) -> + ?DICT:erase(Id, MembersState). + +blank_member() -> + #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. + +blank_member_state() -> + ?DICT:new(). + +store_member(Id, MemberState, MembersState) -> + ?DICT:store(Id, MemberState, MembersState). + +prepare_members_state(MembersState) -> + ?DICT:to_list(MembersState). + +build_members_state(MembersStateList) -> + ?DICT:from_list(MembersStateList). + + +%% --------------------------------------------------------------------------- +%% Activity assembly +%% --------------------------------------------------------------------------- + +activity_nil() -> + queue:new(). + +activity_cons(_Id, [], [], Tail) -> + Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> + queue:in({Sender, Pubs, Acks}, Tail). + +activity_finalise(Activity) -> + queue:to_list(Activity). + +maybe_send_activity([], _State) -> + ok; +maybe_send_activity(Activity, #state { self = Self, + right = {Right, _MRefR}, + view = View }) -> + send_right(Right, View, {activity, Self, Activity}). + +send_right(Right, View, Msg) -> + ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + +callback(Args, Module, Activity) -> + lists:foldl( + fun ({Id, Pubs, _Acks}, ok) -> + lists:foldl(fun ({_PubNum, Pub}, ok) -> + Module:handle_msg(Args, Id, Pub); + (_, Error) -> + Error + end, ok, Pubs); + (_, Error) -> + Error + end, ok, Activity). + +callback_view_changed(Args, Module, OldView, NewView) -> + OldMembers = all_known_members(OldView), + NewMembers = all_known_members(NewView), + Births = NewMembers -- OldMembers, + Deaths = OldMembers -- NewMembers, + case {Births, Deaths} of + {[], []} -> ok; + _ -> Module:members_changed(Args, Births, Deaths) + end. + +handle_callback_result({Result, State}) -> + if_callback_success( + Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); +handle_callback_result({Result, Reply, State}) -> + if_callback_success( + Result, fun reply_true/3, fun reply_false/3, Reply, State). + +no_reply_true (_Result, _Undefined, State) -> noreply(State). +no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. + +reply_true (_Result, Reply, State) -> reply(Reply, State). +reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. + +handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). +handle_msg_false(Result, _Msg, State) -> {Result, State}. + +activity_true(_Result, Activity, State = #state { module = Module, + callback_args = Args }) -> + {callback(Args, Module, Activity), State}. +activity_false(Result, _Activity, State) -> + {Result, State}. + +if_callback_success(ok, True, _False, Arg, State) -> + True(ok, Arg, State); +if_callback_success( + {become, Module, Args} = Result, True, _False, Arg, State) -> + True(Result, Arg, State #state { module = Module, + callback_args = Args }); +if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> + False(Result, Arg, State). + +maybe_confirm(_Self, _Id, Confirms, []) -> + Confirms; +maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> + case queue:out(Confirms) of + {empty, _Confirms} -> + Confirms; + {{value, {PubNum, From}}, Confirms1} -> + gen_server2:reply(From, ok), + maybe_confirm(Self, Self, Confirms1, PubNums); + {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum -> + maybe_confirm(Self, Self, Confirms, PubNums) + end; +maybe_confirm(_Self, _Id, Confirms, _PubNums) -> + Confirms. + +purge_confirms(Confirms) -> + [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], + queue:new(). + + +%% --------------------------------------------------------------------------- +%% Msg transformation +%% --------------------------------------------------------------------------- + +acks_from_queue(Q) -> + [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. + +pubs_from_queue(Q) -> + queue:to_list(Q). + +queue_from_pubs(Pubs) -> + queue:from_list(Pubs). + +apply_acks([], Pubs) -> + Pubs; +apply_acks(List, Pubs) -> + {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. + +join_pubs(Q, []) -> Q; +join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). + +last_ack([], LA) -> + LA; +last_ack(List, LA) -> + LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub([], LP) -> + LP; +last_pub(List, LP) -> + {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/gm_test.erl b/src/gm_test.erl new file mode 100644 index 0000000000..e8f2859832 --- /dev/null +++ b/src/gm_test.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(gm_test). + +-export([test/0]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +get_state() -> + get(state). + +with_state(Fun) -> + put(state, Fun(get_state())). + +inc() -> + case 1 + get(count) of + 100000 -> Now = os:timestamp(), + Start = put(ts, Now), + Diff = timer:now_diff(Now, Start), + Rate = 100000 / (Diff / 1000000), + io:format("~p seeing ~p msgs/sec~n", [self(), Rate]), + put(count, 0); + N -> put(count, N) + end. + +joined([], Members) -> + io:format("Joined ~p (~p members)~n", [self(), length(Members)]), + put(state, dict:from_list([{Member, empty} || Member <- Members])), + put(count, 0), + put(ts, os:timestamp()), + ok. + +members_changed([], Births, Deaths) -> + with_state( + fun (State) -> + State1 = + lists:foldl( + fun (Born, StateN) -> + false = dict:is_key(Born, StateN), + dict:store(Born, empty, StateN) + end, State, Births), + lists:foldl( + fun (Died, StateN) -> + true = dict:is_key(Died, StateN), + dict:store(Died, died, StateN) + end, State1, Deaths) + end), + ok. + +handle_msg([], From, {test_msg, Num}) -> + inc(), + with_state( + fun (State) -> + ok = case dict:find(From, State) of + {ok, died} -> + exit({{from, From}, + {received_posthumous_delivery, Num}}); + {ok, empty} -> ok; + {ok, Num} -> ok; + {ok, Num1} when Num < Num1 -> + exit({{from, From}, + {duplicate_delivery_of, Num1}, + {expecting, Num}}); + {ok, Num1} -> + exit({{from, From}, + {missing_delivery_of, Num}, + {received_early, Num1}}); + error -> + exit({{from, From}, + {received_premature_delivery, Num}}) + end, + dict:store(From, Num + 1, State) + end), + ok. + +terminate([], Reason) -> + io:format("Left ~p (~p)~n", [self(), Reason]), + ok. + +spawn_member() -> + spawn_link( + fun () -> + random:seed(now()), + %% start up delay of no more than 10 seconds + timer:sleep(random:uniform(10000)), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []), + Start = random:uniform(10000), + send_loop(Pid, Start, Start + random:uniform(10000)), + gm:leave(Pid), + spawn_more() + end). + +spawn_more() -> + [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))]. + +send_loop(_Pid, Target, Target) -> + ok; +send_loop(Pid, Count, Target) when Target > Count -> + case random:uniform(3) of + 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count}); + _ -> gm:broadcast(Pid, {test_msg, Count}) + end, + timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms + send_loop(Pid, Count + 1, Target). + +test() -> + ok = gm:create_tables(), + spawn_member(), + spawn_member(). diff --git a/src/rabbit.erl b/src/rabbit.erl index b041a6372c..57bec3317f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -36,6 +36,12 @@ []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_registry]}}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, {enables, external_infrastructure}]}). @@ -54,13 +60,6 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_registry, - [{description, "plugin registry"}, - {mfa, {rabbit_sup, start_child, - [rabbit_registry]}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad9e3ce609..9586bdbef8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,8 +18,8 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, - maybe_run_queue_via_backing_queue/2, - maybe_run_queue_via_backing_queue_async/2, + maybe_run_queue_via_backing_queue/3, + maybe_run_queue_via_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). @@ -33,6 +33,7 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -138,10 +139,10 @@ -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). --spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). --spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue/3 :: + (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/3 :: + (pid(), atom(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -189,12 +190,13 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none, + mirror_pids = []}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -445,11 +447,13 @@ internal_delete(QueueName) -> ok = rabbit_binding:process_deletions(Deletions, Tx) end). -maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). -maybe_run_queue_via_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +maybe_run_queue_via_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}, + infinity). + +maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). @@ -472,7 +476,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + mirror_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])) end, @@ -489,11 +494,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + mirror_pids = []}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 663977ba87..279c267e00 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,6 +33,8 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). +-export([init_with_backing_queue_state/6]). + % Queue's state -record(q, {q, exclusive_consumer, @@ -72,7 +74,8 @@ messages, consumers, memory, - backing_queue_status + backing_queue_status, + mirror_pids ]). -define(CREATION_EVENT_KEYS, @@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -115,6 +117,36 @@ init(Q) -> guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, AckTags, Deliveries) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()})), + lists:foldl( + fun (Delivery, StateN) -> + {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN), + StateN1 + end, State, Deliveries). + terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> @@ -135,8 +167,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -147,7 +178,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(QName, IsDurable, Recover), + BQS = BQ:init(Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -209,6 +240,13 @@ next_state(State) -> false -> {stop_sync_timer(State2), hibernate} end. +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Nodes -> rabbit_mirror_queue_master + end. + ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), @@ -482,7 +520,7 @@ attempt_delivery(#delivery{txn = none, AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ needs_confirming = (NeedsConfirming =:= confirm)}, - BQS), + ChPid, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, @@ -499,7 +537,8 @@ attempt_delivery(#delivery{txn = Txn, {true, NeedsConfirming, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS)}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -512,15 +551,17 @@ deliver_or_enqueue(Delivery, State) -> (message_properties(State)) #message_properties{ needs_confirming = (NeedsConfirming =:= confirm)}, - BQS), + Delivery #delivery.sender, BQS), {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> - {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} - end, State). + BQ, fun (BQS) -> + {_Guids, BQS1} = + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + {[], BQS1} + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -622,8 +663,10 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {Guids, BQS1} = Fun(BQS), +maybe_run_queue_via_backing_queue(Mod, Fun, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS), run_message_queue( confirm_messages(Guids, State#q{backing_queue_state = BQS1})). @@ -724,6 +767,9 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(mirror_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name), + MPids; i(Item, _) -> throw({bad_argument, Item}). @@ -759,29 +805,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -994,12 +1040,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). +handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1021,7 +1067,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - BQS1 = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1042,7 +1088,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1} end) end; @@ -1137,7 +1183,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); + BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6a21e10fd3..1aa6ea6745 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,7 +33,7 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 3}, + {init, 2}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -47,12 +47,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Drop messages from the head of the queue while the supplied %% predicate returns true. @@ -66,7 +66,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -122,7 +122,12 @@ behaviour_info(callbacks) -> %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status - {status, 1} + {status, 1}, + + %% Passed a function to be invoked with the relevant backing + %% queue's state. Useful for when the backing queue or other + %% components need to pass functions into the backing queue. + {invoke, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 78391be2ef..4dfc34b57c 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -339,6 +339,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl new file mode 100644 index 0000000000..608148b5f4 --- /dev/null +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -0,0 +1,136 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_coordinator). + +-export([start_link/2, add_slave/2, get_gm/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm + }). + +-define(ONE_SECOND, 1000). + +start_link(Queue, GM) -> + gen_server2:start_link(?MODULE, [Queue, GM], []). + +add_slave(CPid, SlaveNode) -> + gen_server2:cast(CPid, {add_slave, SlaveNode}). + +get_gm(CPid) -> + gen_server2:call(CPid, get_gm, infinity). + +%% --------------------------------------------------------------------------- +%% gen_server +%% --------------------------------------------------------------------------- + +init([#amqqueue { name = QueueName } = Q, GM]) -> + GM1 = case GM of + undefined -> + ok = gm:create_tables(), + {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM2, _Members} -> + ok + end, + GM2; + _ -> + true = link(GM), + GM + end, + {ok, _TRef} = + timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + {ok, #state { q = Q, gm = GM1 }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(get_gm, _From, State = #state { gm = GM }) -> + reply(GM, State). + +handle_cast({add_slave, Node}, State = #state { q = Q }) -> + Nodes = nodes(), + case lists:member(Node, Nodes) of + true -> + Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]), + rabbit_log:info("Adding slave node for queue ~p: ~p~n", + [Q #amqqueue.name, Result]); + false -> + rabbit_log:info( + "Ignoring request to add slave on node ~p for queue ~p~n", + [Q #amqqueue.name, Node]) + end, + noreply(State); + +handle_cast({gm_deaths, Deaths}, + State = #state { q = #amqqueue { name = QueueName } }) -> + rabbit_log:info("Master ~p saw deaths ~p for queue ~p~n", + [self(), Deaths, QueueName]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node() -> + noreply(State); + {error, not_found} -> + {stop, normal, State} + end. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, #state{}) -> + %% gen_server case + ok; +terminate([_CPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([CPid], Members) -> + CPid ! {joined, self(), Members}, + ok. + +members_changed([_CPid], _Births, []) -> + ok; +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). + +handle_msg([_CPid], _From, heartbeat) -> + ok; +handle_msg([_CPid], _From, _Msg) -> + ok. + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl new file mode 100644 index 0000000000..11831a2998 --- /dev/null +++ b/src/rabbit_mirror_queue_master.erl @@ -0,0 +1,250 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_master). + +-export([init/2, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, dropwhile/2, + set_ram_duration_target/2, ram_duration/1, + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1, invoke/3]). + +-export([start/1, stop/0]). + +-export([promote_backing_queue_state/5]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(state, { gm, + coordinator, + backing_queue, + backing_queue_state, + set_delivered, + seen + }). + +%% --------------------------------------------------------------------------- +%% Backing queue +%% --------------------------------------------------------------------------- + +start(_DurableQueues) -> + %% This will never get called as this module will never be + %% installed as the default BQ implementation. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +stop() -> + %% Same as start/1. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +init(#amqqueue { arguments = Args } = Q, Recover) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), + Nodes1 = case Nodes of + [] -> nodes(); + _ -> [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] + end, + [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1], + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = 0, + seen = sets:new() }. + +promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen = Seen }. + +terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination. The queue is going down but + %% shouldn't be deleted. Most likely safe shutdown of this + %% node. Thus just let some other slave take over. + State #state { backing_queue_state = BQ:terminate(BQS) }. + +delete_and_terminate(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, delete_and_terminate), + State #state { backing_queue_state = BQ:delete_and_terminate(BQS), + set_delivered = 0 }. + +purge(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_length, 0}), + {Count, BQS1} = BQ:purge(BQS), + {Count, State #state { backing_queue_state = BQS1, + set_delivered = 0 }}. + +publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 } + end. + +publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps, + ChPid, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, + MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, + MsgProps, ChPid, BQS), + {AckTag, State #state { backing_queue_state = BQS1 }} + end. + +dropwhile(Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> + Len = BQ:len(BQS), + BQS1 = BQ:dropwhile(Fun, BQS), + Dropped = Len - BQ:len(BQS1), + SetDelivered1 = lists:max([0, SetDelivered - Dropped]), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. + +fetch(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered, + seen = Seen }) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {#basic_message { guid = Guid } = Message, IsDelivered, AckTag, + Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + SetDelivered1 = lists:max([0, SetDelivered - 1]), + Seen1 = case SetDelivered + SetDelivered1 of + 1 -> sets:new(); %% transition to empty + _ -> Seen + end, + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1, + seen = Seen1 }} + end. + +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Guids, BQS1} = BQ:ack(AckTags, BQS), + case Guids of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, Guids}) + end, + {Guids, State #state { backing_queue_state = BQS1 }}. + +tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> + %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid}) + State. + +tx_ack(Txn, AckTags, #state {} = State) -> + %% gm:broadcast(GM, {tx_ack, Txn, Guids}) + State. + +tx_rollback(Txn, #state {} = State) -> + %% gm:broadcast(GM, {tx_rollback, Txn}) + {[], State}. + +tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) -> + %% Maybe don't want to transmit the MsgPropsFun but what choice do + %% we have? OTOH, on the slaves, things won't be expiring on their + %% own (props are interpreted by amqqueue, not vq), so if the msg + %% props aren't quite the same, that doesn't matter. + %% + %% The PostCommitFun is actually worse - we need to prevent that + %% from being invoked until we have confirmation from all the + %% slaves that they've done everything up to there. + %% + %% In fact, transactions are going to need work seeing as it's at + %% this point that VQ mentions amqqueue, which will thus not work + %% on the slaves - we need to make sure that all the slaves do the + %% tx_commit_post_msg_store at the same point, and then when they + %% all confirm that (scatter/gather), we can finally invoke the + %% PostCommitFun. + %% + %% Another idea is that the slaves are actually driven with + %% pubacks and thus only the master needs to support txns + %% directly. + {[], State}. + +requeue(AckTags, MsgPropsFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + ok = gm:broadcast(GM, {requeue, MsgPropsFun, Guids}), + {Guids, State #state { backing_queue_state = BQS1 }}. + +len(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:len(BQS). + +is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:is_empty(BQS). + +set_ram_duration_target(Target, State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = + BQ:set_ram_duration_target(Target, BQS) }. + +ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + {Result, BQS1} = BQ:ram_duration(BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + +needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:needs_idle_timeout(BQS). + +idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:idle_timeout(BQS). + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. + +status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:status(BQS). + +invoke(?MODULE, Fun, State) -> + Fun(State); +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS), + {Guids, State #state { backing_queue_state = BQS1 }}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl new file mode 100644 index 0000000000..090cb81203 --- /dev/null +++ b/src/rabbit_mirror_queue_misc.erl @@ -0,0 +1,46 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_misc). + +-export([remove_from_queue/2]). + +-include("rabbit.hrl"). + +remove_from_queue(QueueName, DeadPids) -> + DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + rabbit_misc:execute_mnesia_transaction( + fun () -> + %% Someone else could have deleted the queue before we + %% get here. + case mnesia:read({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [Q = #amqqueue { pid = QPid, + mirror_pids = MPids }] -> + [QPid1 | MPids1] = + [Pid || Pid <- [QPid | MPids], + not lists:member(node(Pid), DeadNodes)], + case {{QPid, MPids}, {QPid1, MPids1}} of + {Same, Same} -> + {ok, QPid}; + _ -> + Q1 = Q #amqqueue { pid = QPid1, + mirror_pids = MPids1 }, + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1} + end + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 0000000000..a61cea0d14 --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,529 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. +%% +%% Thus, we need a queue per sender, and a queue for GM instructions. +%% +%% On receipt of a GM group instruction, three things are possible: +%% 1. The queue of publisher messages is empty. Thus store the GM +%% instruction to the instrQ. +%% 2. The head of the queue of publisher messages has a message that +%% matches the GUID of the GM instruction. Remove the message, and +%% route appropriately. +%% 3. The head of the queue of publisher messages has a message that +%% does not match the GUID of the GM instruction. Throw away the GM +%% instruction: the GM instruction must correspond to a message +%% that we'll never receive. If it did not, then before the current +%% instruction, we would have received an instruction for the +%% message at the head of this queue, thus the head of the queue +%% would have been removed and processed. +%% +%% On receipt of a publisher message, three things are possible: +%% 1. The queue of GM group instructions is empty. Add the message to +%% the relevant queue and await instructions from the GM. +%% 2. The head of the queue of GM group instructions has an +%% instruction matching the GUID of the message. Remove that +%% instruction and act on it. Attempt to process the rest of the +%% instrQ. +%% 3. The head of the queue of GM group instructions has an +%% instruction that does not match the GUID of the message. If the +%% message is from the same publisher as is referred to by the +%% instruction then throw away the GM group instruction and repeat +%% - attempt to match against the next instruction if there is one: +%% The instruction thrown away was for a message we'll never +%% receive. +%% +%% In all cases, we are relying heavily on order preserving messaging +%% both from the GM group and from the publishers. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm, + master_node, + backing_queue, + backing_queue_state, + rate_timer_ref, + + sender_queues, %% :: Pid -> MsgQ + guid_ack, %% :: Guid -> AckTag + seen, %% Set Guid + + guid_to_channel %% for confirms + }). + +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + ok = gm:create_tables(), + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { mirror_pids = MPids1 }, + write), + {ok, QPid}; + _ -> + {error, node_already_present} + end + end) of + {ok, MPid} -> + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, + [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, false), + {ok, #state { q = Q, + gm = GM, + master_node = node(MPid), + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + + sender_queues = dict:new(), + guid_ack = dict:new(), + seen = sets:new(), + + guid_to_channel = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + {error, Error} -> + {stop, Error} + end. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_node = MNode }) -> + rabbit_log:info("Slave ~p saw deaths ~p for queue ~p~n", + [self(), Deaths, QueueName]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= MNode -> + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + promote_me(From, State); + {ok, Pid} -> + gen_server2:reply(From, ok), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_node = node(Pid) }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end; + +handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). + + +handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }). + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], []), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% mainly copied from amqqueue_process + BQS1 = BQ:handle_pre_hibernate(BQS), + %% no activity for a while == 0 egress and ingress rates + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), infinity), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS2 })}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +maybe_run_queue_via_backing_queue( + Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS, + guid_to_channel = GTC }) -> + {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS), + GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids), + State #state { backing_queue_state = BQS1, + guid_to_channel = GTC1 }. + +record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) -> + GTC; +record_confirm_or_confirm( + #delivery { sender = ChPid, + message = #basic_message { is_persistent = true, + guid = Guid }, + msg_seq_no = MsgSeqNo }, #amqqueue { durable = true }, GTC) -> + dict:store(Guid, {ChPid, MsgSeqNo}, GTC); +record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo }, + _Q, GTC) -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNo), + GTC. + +maybe_confirm_message(Guid, GTC) -> + case dict:find(Guid, GTC) of + {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNo), + dict:erase(Guid, GTC); + error -> + GTC + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + seen = Seen, + guid_ack = GA }) -> + rabbit_log:info("Promoting slave ~p for queue ~p~n", + [self(), Q #amqqueue.name]), + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, Seen), + %% We have to do the requeue via this init because otherwise we + %% don't have access to the relevent MsgPropsFun. Also, we are + %% already in mnesia as the master queue pid. Thus we cannot just + %% publish stuff by sending it to ourself - we must pass it + %% through to this init, otherwise we can violate ordering + %% constraints. + AckTags = [AckTag || {_Guid, AckTag} <- dict:to_list(GA)], + Deliveries = lists:append([queue:to_list(PubQ) + || {_ChPid, PubQ} <- dict:to_list(SQ)]), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {noreply, next_state(State), hibernate}. + +reply(Reply, State) -> + {reply, Reply, next_state(State), hibernate}. + +next_state(State) -> + ensure_rate_timer(State). + +%% copied+pasted from amqqueue_process +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { rate_timer_ref = undefined }. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { guid = Guid }, + sender = ChPid }, + State = #state { q = Q, + sender_queues = SQ, + seen = Seen, + guid_to_channel = GTC }) -> + case sets:is_element(Guid, Seen) of + true -> + GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), + State #state { guid_to_channel = GTC1, + seen = sets:del_element(Guid, Seen) }; + false -> + MQ = case dict:find(ChPid, SQ) of + {ok, MQ1} -> MQ1; + error -> queue:new() + end, + SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), + State #state { sender_queues = SQ1 } + end. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }}, + State = #state { q = Q, + sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA, + seen = Seen, + guid_to_channel = GTC }) -> + {SQ1, Seen1, GTC1} = + case dict:find(ChPid, SQ) of + error -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {{value, Delivery = #delivery { + message = #basic_message { guid = Guid } }}, + MQ1} -> + GTC2 = record_confirm_or_confirm(Delivery, Q, GTC), + {dict:store(ChPid, MQ1, SQ), Seen, GTC2}; + {{value, #delivery {}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% amqqueue record. We'll never receive the + %% message directly. + {SQ, Seen, GTC} + end + end, + State1 = State #state { sender_queues = SQ1, + seen = Seen1, + guid_to_channel = GTC1 }, + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State1 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + {GA1, GTC3} = case AckRequired of + true -> {dict:store(Guid, AckTag, GA), GTC1}; + false -> {GA, maybe_confirm_message(Guid, GTC1)} + end, + State1 #state { backing_queue_state = BQS1, + guid_ack = GA1, + guid_to_channel = GTC3 } + end}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, Guid, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + GA1 = case AckRequired of + true -> dict:store(Guid, AckTag, GA); + false -> GA + end, + State #state { backing_queue_state = BQS1, + guid_ack = GA1 }; + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, Guids}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA }) -> + {AckTags, GA1} = guids_to_acktags(Guids, GA), + {Guids1, BQS1} = BQ:ack(AckTags, BQS), + [] = Guids1 -- Guids, %% ASSERTION + {ok, State #state { guid_ack = GA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, Guids}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA }) -> + {AckTags, GA1} = guids_to_acktags(Guids, GA), + {ok, case length(AckTags) =:= length(Guids) of + true -> + {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { guid_ack = GA1, + backing_queue_state = BQS1 }; + false -> + %% the only thing we can safely do is nuke out our BQ + %% and GA + {_Count, BQS1} = BQ:purge(BQS), + {Guids, BQS2} = ack_all(BQ, GA, BQS1), + State #state { guid_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction(delete_and_terminate, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(BQS), + {stop, State #state { backing_queue_state = undefined }}. + +guids_to_acktags(Guids, GA) -> + {AckTags, GA1} = + lists:foldl(fun (Guid, {AckTagsN, GAN}) -> + case dict:find(Guid, GA) of + error -> {AckTagsN, GAN}; + {ok, AckTag} -> {[AckTag | AckTagsN], + dict:erase(Guid, GAN)} + end + end, {[], GA}, Guids), + {lists:reverse(AckTags), GA1}. + +ack_all(BQ, GA, BQS) -> + BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS). diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl new file mode 100644 index 0000000000..80c0520c08 --- /dev/null +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -0,0 +1,54 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave_sup). + +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, queue_sup_queue_recovery}, + {enables, routing_ready}]}). + +-behaviour(supervisor2). + +-export([start/0, start_link/0, start_child/2]). + +-export([init/1]). + +-include_lib("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +start() -> + {ok, _} = + supervisor:start_child( + rabbit_sup, + {rabbit_mirror_queue_slave_sup, + {rabbit_mirror_queue_slave_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), + ok. + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 10, 10}, + [{rabbit_mirror_queue_slave, + {rabbit_mirror_queue_slave, start_link, []}, + temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a9b4e17745..1ad65759d4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -204,7 +204,8 @@ table_definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 692d2473b8..309e0e6ed9 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -102,7 +102,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, mirror_pids = MPids}] -> + MPids ++ [QPid | QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b0950832..c2c85082ba 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1853,7 +1853,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1871,9 +1871,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, + VQ = rabbit_variable_queue:init(test_amqqueue(true), false, fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, @@ -1932,7 +1936,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -1976,7 +1980,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1986,7 +1990,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2020,7 +2024,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2049,7 +2053,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2063,10 +2067,11 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2074,7 +2079,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2098,7 +2103,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, + VQ1 = rabbit_variable_queue:init(Q, true, fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 3dbe740f27..bde336c000 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -123,7 +123,8 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + mirror_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f39bc96426..366044795c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,18 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/2, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, invoke/3]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/4]). %%---------------------------------------------------------------------------- %% Definitions: @@ -394,15 +394,16 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> +init(Queue, Recover) -> Self = self(), - init(QueueName, IsDurable, Recover, + init(Queue, Recover, fun (Guids, ActionTaken) -> msgs_written_to_disk(Self, Guids, ActionTaken) end, fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], case IsDurable of @@ -412,7 +413,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName }, true, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -502,18 +504,19 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, #basic_message { guid = Guid }, - _MsgProps, State = #vqstate { len = 0 }) -> +publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, _ChPid, + State = #vqstate { len = 0 }) -> blind_confirm(self(), gb_sets:singleton(Guid)), {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -643,13 +646,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State)). + {Guids, State1} = ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State), + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of @@ -699,7 +703,7 @@ requeue(AckTags, MsgPropsFun, State) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - a(reduce_memory_use( + {Guids, State1} = ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), @@ -714,7 +718,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + {Guids, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -852,6 +857,9 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. +invoke(?MODULE, Fun, State) -> + Fun(State). + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -963,7 +971,7 @@ msg_store_close_fds_fun(IsPersistent) -> Self = self(), fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, + Self, ?MODULE, fun (State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), @@ -1109,10 +1117,11 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) + Self, ?MODULE, + fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} + end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( fun () -> remove_persistent_messages( @@ -1175,20 +1184,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {SeqIds, State1 = #vqstate { index_state = IndexState }} = + {_Guids, State1} = ack(Acks, State), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProps}, - {SeqIdsAcc, State2}) -> + {SeqIdsAcc, State3}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = - publish(Msg, MsgProps, false, IsPersistent1, State2), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + {SeqId, State4} = + publish(Msg, MsgProps, false, IsPersistent1, State3), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} + end, {PAcks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1335,7 +1345,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, GuidsByStore} = + {PersistentSeqIds, GuidsByStore, _AllGuids} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1354,9 +1364,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, GuidsByStore}, + {{PersistentSeqIds, GuidsByStore, AllGuids}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1376,21 +1386,24 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + {lists:reverse(AllGuids), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new()}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, - {PersistentSeqIdsAcc, GuidsByStore}) -> - {PersistentSeqIdsAcc, GuidsByStore}; + index_on_disk = false, + guid = Guid }, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, - {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), + [Guid | AllGuids]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1436,33 +1449,35 @@ msgs_confirmed(GuidSet, State) -> blind_confirm(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(GuidSet, State) end). + QPid, ?MODULE, fun (State) -> msgs_confirmed(GuidSet, State) end). msgs_written_to_disk(QPid, GuidSet, removed) -> blind_confirm(QPid, GuidSet); msgs_written_to_disk(QPid, GuidSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) - end). + QPid, ?MODULE, + fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). msg_indices_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) - end). + QPid, ?MODULE, + fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |
