diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-07 14:00:08 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-07 14:00:08 +0000 |
| commit | e90ea3baaac8070b0efd1e470a03174aad9f010e (patch) | |
| tree | 0c65ebae31f851551dfef6d9fd5d6aa1854fffc2 /src | |
| parent | 0c54e71511a3d31d6626f6e1617f8598ea67c8f9 (diff) | |
| parent | 807081deffb9b03cab8723ba0aa354dedbc27e8f (diff) | |
| download | rabbitmq-server-git-e90ea3baaac8070b0efd1e470a03174aad9f010e.tar.gz | |
Merging new head of bug23810 to default, which results in a tiny and insignificant change
Diffstat (limited to 'src')
51 files changed, 2877 insertions, 1069 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 1e1f37cb3d..6f8241b3be 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, + info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -241,7 +242,7 @@ -> val_or_error(ref())). -spec(close/1 :: (ref()) -> ok_or_error()). -spec(read/2 :: (ref(), non_neg_integer()) -> - val_or_error([char()] | binary()) | 'eof'). + val_or_error([char()] | binary()) | 'eof'). -spec(append/2 :: (ref(), iodata()) -> ok_or_error()). -spec(sync/1 :: (ref()) -> ok_or_error()). -spec(position/2 :: (ref(), position()) -> val_or_error(offset())). @@ -251,7 +252,7 @@ -spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())). -spec(flush/1 :: (ref()) -> ok_or_error()). -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> - val_or_error(non_neg_integer())). + val_or_error(non_neg_integer())). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(delete/1 :: (ref()) -> ok_or_error()). -spec(clear/1 :: (ref()) -> ok_or_error()). @@ -259,11 +260,17 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(info_keys/0 :: () -> [atom()]). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(info/1 :: ([atom()]) -> [{atom(), any()}]). -spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [obtain_count, obtain_limit]). + +%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -494,6 +501,11 @@ set_limit(Limit) -> get_limit() -> gen_server:call(?SERVER, get_limit, infinity). +info_keys() -> ?INFO_KEYS. + +info() -> info(?INFO_KEYS). +info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; +i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(Item, _) -> throw({bad_argument, Item}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -849,35 +867,41 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State1)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) - when Limit =/= infinity andalso Count >= Limit -> - ok = track_client(Pid, Clients), - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, - {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, ok = track_client(Pid, Clients), - case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of - true -> - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - {noreply, reduce(State #fhc_state { - obtain_pending = pending_in(Item, Pending) })}; - false -> - {noreply, run_pending_item(Item, State)} - end; + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Enqueue = fun () -> + true = ets:update_element(Clients, Pid, + {#cstate.blocked, true}), + State #fhc_state { + obtain_pending = pending_in(Item, Pending) } + end, + {noreply, + case obtain_limit_reached(State) of + true -> Enqueue(); + false -> case needs_reduce(State #fhc_state { + obtain_count = Count + 1 }) of + true -> reduce(Enqueue()); + false -> adjust_alarm( + State, run_pending_item(Item, State)) + end + end}; + handle_call({set_limit, Limit}, _From, State) -> - {reply, ok, maybe_reduce( - process_pending(State #fhc_state { - limit = Limit, - obtain_limit = obtain_limit(Limit) }))}; + {reply, ok, adjust_alarm( + State, maybe_reduce( + process_pending( + State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) })))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}. + {reply, Limit, State}; + +handle_call({info, Items}, _From, State) -> + {reply, infos(Items, State), State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> @@ -900,9 +924,9 @@ handle_cast({close, Pid, EldestUnusedSince}, _ -> dict:store(Pid, EldestUnusedSince, Elders) end, ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), - {noreply, process_pending( + {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, - State #fhc_state { elders = Elders1 }))}; + State #fhc_state { elders = Elders1 })))}; handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), @@ -924,13 +948,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - {noreply, process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(FilterFun, OpenPending), - obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending), - elders = dict:erase(Pid, Elders) })}. + {noreply, adjust_alarm( + State, + process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) }))}. terminate(_Reason, State = #fhc_state { clients = Clients }) -> ets:delete(Clients), @@ -990,6 +1016,18 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of OLimit -> OLimit end. +obtain_limit_reached(#fhc_state { obtain_limit = Limit, + obtain_count = Count}) -> + Limit =/= infinity andalso Count >= Limit. + +adjust_alarm(OldState, NewState) -> + case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of + {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []}); + {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit); + _ -> ok + end, + NewState. + requested({_Kind, _Pid, Requested, _From}) -> Requested. @@ -1094,7 +1132,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, case CStates of [] -> ok; _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of AverageAge when AverageAge > 0 -> notify_age(CStates, AverageAge); _ -> diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 94296f9751..43e0a8f5df 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -453,8 +453,8 @@ unregister_name({global,Name}) -> _ = global:unregister_name(Name); unregister_name(Pid) when is_pid(Pid) -> Pid; -% Under R12 let's just ignore it, as we have a single term as Name. -% On R13 it will never get here, as we get tuple with 'local/global' atom. +%% Under R12 let's just ignore it, as we have a single term as Name. +%% On R13 it will never get here, as we get tuple with 'local/global' atom. unregister_name(_Name) -> ok. extend_backoff(undefined) -> diff --git a/src/gm.erl b/src/gm.erl new file mode 100644 index 0000000000..fd8d9b7792 --- /dev/null +++ b/src/gm.erl @@ -0,0 +1,1321 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm). + +%% Guaranteed Multicast +%% ==================== +%% +%% This module provides the ability to create named groups of +%% processes to which members can be dynamically added and removed, +%% and for messages to be broadcast within the group that are +%% guaranteed to reach all members of the group during the lifetime of +%% the message. The lifetime of a message is defined as being, at a +%% minimum, the time from which the message is first sent to any +%% member of the group, up until the time at which it is known by the +%% member who published the message that the message has reached all +%% group members. +%% +%% The guarantee given is that provided a message, once sent, makes it +%% to members who do not all leave the group, the message will +%% continue to propagate to all group members. +%% +%% Another way of stating the guarantee is that if member P publishes +%% messages m and m', then for all members P', if P' is a member of +%% the group prior to the publication of m, and P' receives m', then +%% P' will receive m. +%% +%% Note that only local-ordering is enforced: i.e. if member P sends +%% message m and then message m', then for-all members P', if P' +%% receives m and m', then they will receive m' after m. Causality +%% ordering is _not_ enforced. I.e. if member P receives message m +%% and as a result publishes message m', there is no guarantee that +%% other members P' will receive m before m'. +%% +%% +%% API Use +%% ------- +%% +%% Mnesia must be started. Use the idempotent create_tables/0 function +%% to create the tables required. +%% +%% start_link/3 +%% Provide the group name, the callback module name, and any arguments +%% you wish to be passed into the callback module's functions. The +%% joined/2 function will be called when we have joined the group, +%% with the arguments passed to start_link and a list of the current +%% members of the group. See the comments in behaviour_info/1 below +%% for further details of the callback functions. +%% +%% leave/1 +%% Provide the Pid. Removes the Pid from the group. The callback +%% terminate/2 function will be called. +%% +%% broadcast/2 +%% Provide the Pid and a Message. The message will be sent to all +%% members of the group as per the guarantees given above. This is a +%% cast and the function call will return immediately. There is no +%% guarantee that the message will reach any member of the group. +%% +%% confirmed_broadcast/2 +%% Provide the Pid and a Message. As per broadcast/2 except that this +%% is a call, not a cast, and only returns 'ok' once the Message has +%% reached every member of the group. Do not call +%% confirmed_broadcast/2 directly from the callback module otherwise +%% you will deadlock the entire group. +%% +%% group_members/1 +%% Provide the Pid. Returns a list of the current group members. +%% +%% +%% Implementation Overview +%% ----------------------- +%% +%% One possible means of implementation would be a fan-out from the +%% sender to every member of the group. This would require that the +%% group is fully connected, and, in the event that the original +%% sender of the message disappears from the group before the message +%% has made it to every member of the group, raises questions as to +%% who is responsible for sending on the message to new group members. +%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] - +%% if the sender dies part way through, who is responsible for +%% ensuring that the remaining Members receive the Msg? In the event +%% that within the group, messages sent are broadcast from a subset of +%% the members, the fan-out arrangement has the potential to +%% substantially impact the CPU and network workload of such members, +%% as such members would have to accommodate the cost of sending each +%% message to every group member. +%% +%% Instead, if the members of the group are arranged in a chain, then +%% it becomes easier to reason about who within the group has received +%% each message and who has not. It eases issues of responsibility: in +%% the event of a group member disappearing, the nearest upstream +%% member of the chain is responsible for ensuring that messages +%% continue to propagate down the chain. It also results in equal +%% distribution of sending and receiving workload, even if all +%% messages are being sent from just a single group member. This +%% configuration has the further advantage that it is not necessary +%% for every group member to know of every other group member, and +%% even that a group member does not have to be accessible from all +%% other group members. +%% +%% Performance is kept high by permitting pipelining and all +%% communication between joined group members is asynchronous. In the +%% chain A -> B -> C -> D, if A sends a message to the group, it will +%% not directly contact C or D. However, it must know that D receives +%% the message (in addition to B and C) before it can consider the +%% message fully sent. A simplistic implementation would require that +%% D replies to C, C replies to B and B then replies to A. This would +%% result in a propagation delay of twice the length of the chain. It +%% would also require, in the event of the failure of C, that D knows +%% to directly contact B and issue the necessary replies. Instead, the +%% chain forms a ring: D sends the message on to A: D does not +%% distinguish A as the sender, merely as the next member (downstream) +%% within the chain (which has now become a ring). When A receives +%% from D messages that A sent, it knows that all members have +%% received the message. However, the message is not dead yet: if C +%% died as B was sending to C, then B would need to detect the death +%% of C and forward the message on to D instead: thus every node has +%% to remember every message published until it is told that it can +%% forget about the message. This is essential not just for dealing +%% with failure of members, but also for the addition of new members. +%% +%% Thus once A receives the message back again, it then sends to B an +%% acknowledgement for the message, indicating that B can now forget +%% about the message. B does so, and forwards the ack to C. C forgets +%% the message, and forwards the ack to D, which forgets the message +%% and finally forwards the ack back to A. At this point, A takes no +%% further action: the message and its acknowledgement have made it to +%% every member of the group. The message is now dead, and any new +%% member joining the group at this point will not receive the +%% message. +%% +%% We therefore have two roles: +%% +%% 1. The sender, who upon receiving their own messages back, must +%% then send out acknowledgements, and upon receiving their own +%% acknowledgements back perform no further action. +%% +%% 2. The other group members who upon receiving messages and +%% acknowledgements must update their own internal state accordingly +%% (the sending member must also do this in order to be able to +%% accommodate failures), and forwards messages on to their downstream +%% neighbours. +%% +%% +%% Implementation: It gets trickier +%% -------------------------------- +%% +%% Chain A -> B -> C -> D +%% +%% A publishes a message which B receives. A now dies. B and D will +%% detect the death of A, and will link up, thus the chain is now B -> +%% C -> D. B forwards A's message on to C, who forwards it to D, who +%% forwards it to B. Thus B is now responsible for A's messages - both +%% publications and acknowledgements that were in flight at the point +%% at which A died. Even worse is that this is transitive: after B +%% forwards A's message to C, B dies as well. Now C is not only +%% responsible for B's in-flight messages, but is also responsible for +%% A's in-flight messages. +%% +%% Lemma 1: A member can only determine which dead members they have +%% inherited responsibility for if there is a total ordering on the +%% conflicting additions and subtractions of members from the group. +%% +%% Consider the simultaneous death of B and addition of B' that +%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or +%% C is responsible for in-flight messages from B. It is easy to +%% ensure that at least one of them thinks they have inherited B, but +%% if we do not ensure that exactly one of them inherits B, then we +%% could have B' converting publishes to acks, which then will crash C +%% as C does not believe it has issued acks for those messages. +%% +%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E +%% becoming A -> C' -> E. Who has inherited which of B, C and D? +%% +%% However, for non-conflicting membership changes, only a partial +%% ordering is required. For example, A -> B -> C becoming A -> A' -> +%% B. The addition of A', between A and B can have no conflicts with +%% the death of C: it is clear that A has inherited C's messages. +%% +%% For ease of implementation, we adopt the simple solution, of +%% imposing a total order on all membership changes. +%% +%% On the death of a member, it is ensured the dead member's +%% neighbours become aware of the death, and the upstream neighbour +%% now sends to its new downstream neighbour its state, including the +%% messages pending acknowledgement. The downstream neighbour can then +%% use this to calculate which publishes and acknowledgements it has +%% missed out on, due to the death of its old upstream. Thus the +%% downstream can catch up, and continues the propagation of messages +%% through the group. +%% +%% Lemma 2: When a member is joining, it must synchronously +%% communicate with its upstream member in order to receive its +%% starting state atomically with its addition to the group. +%% +%% New members must start with the same state as their nearest +%% upstream neighbour. This ensures that it is not surprised by +%% acknowledgements they are sent, and that should their downstream +%% neighbour die, they are able to send the correct state to their new +%% downstream neighbour to ensure it can catch up. Thus in the +%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' -> +%% C, A' must start with the state of A, so that it can send C the +%% correct state when B dies, allowing C to detect any missed +%% messages. +%% +%% If A' starts by adding itself to the group membership, A could then +%% die, without A' having received the necessary state from A. This +%% would leave A' responsible for in-flight messages from A, but +%% having the least knowledge of all, of those messages. Thus A' must +%% start by synchronously calling A, which then immediately sends A' +%% back its state. A then adds A' to the group. If A dies at this +%% point then A' will be able to see this (as A' will fail to appear +%% in the group membership), and thus A' will ignore the state it +%% receives from A, and will simply repeat the process, trying to now +%% join downstream from some other member. This ensures that should +%% the upstream die as soon as the new member has been joined, the new +%% member is guaranteed to receive the correct state, allowing it to +%% correctly process messages inherited due to the death of its +%% upstream neighbour. +%% +%% The canonical definition of the group membership is held by a +%% distributed database. Whilst this allows the total ordering of +%% changes to be achieved, it is nevertheless undesirable to have to +%% query this database for the current view, upon receiving each +%% message. Instead, we wish for members to be able to cache a view of +%% the group membership, which then requires a cache invalidation +%% mechanism. Each member maintains its own view of the group +%% membership. Thus when the group's membership changes, members may +%% need to become aware of such changes in order to be able to +%% accurately process messages they receive. Because of the +%% requirement of a total ordering of conflicting membership changes, +%% it is not possible to use the guaranteed broadcast mechanism to +%% communicate these changes: to achieve the necessary ordering, it +%% would be necessary for such messages to be published by exactly one +%% member, which can not be guaranteed given that such a member could +%% die. +%% +%% The total ordering we enforce on membership changes gives rise to a +%% view version number: every change to the membership creates a +%% different view, and the total ordering permits a simple +%% monotonically increasing view version number. +%% +%% Lemma 3: If a message is sent from a member that holds view version +%% N, it can be correctly processed by any member receiving the +%% message with a view version >= N. +%% +%% Initially, let us suppose that each view contains the ordering of +%% every member that was ever part of the group. Dead members are +%% marked as such. Thus we have a ring of members, some of which are +%% dead, and are thus inherited by the nearest alive downstream +%% member. +%% +%% In the chain A -> B -> C, all three members initially have view +%% version 1, which reflects reality. B publishes a message, which is +%% forward by C to A. B now dies, which A notices very quickly. Thus A +%% updates the view, creating version 2. It now forwards B's +%% publication, sending that message to its new downstream neighbour, +%% C. This happens before C is aware of the death of B. C must become +%% aware of the view change before it interprets the message its +%% received, otherwise it will fail to learn of the death of B, and +%% thus will not realise it has inherited B's messages (and will +%% likely crash). +%% +%% Thus very simply, we have that each subsequent view contains more +%% information than the preceding view. +%% +%% However, to avoid the views growing indefinitely, we need to be +%% able to delete members which have died _and_ for which no messages +%% are in-flight. This requires that upon inheriting a dead member, we +%% know the last publication sent by the dead member (this is easy: we +%% inherit a member because we are the nearest downstream member which +%% implies that we know at least as much than everyone else about the +%% publications of the dead member), and we know the earliest message +%% for which the acknowledgement is still in flight. +%% +%% In the chain A -> B -> C, when B dies, A will send to C its state +%% (as C is the new downstream from A), allowing C to calculate which +%% messages it has missed out on (described above). At this point, C +%% also inherits B's messages. If that state from A also includes the +%% last message published by B for which an acknowledgement has been +%% seen, then C knows exactly which further acknowledgements it must +%% receive (also including issuing acknowledgements for publications +%% still in-flight that it receives), after which it is known there +%% are no more messages in flight for B, thus all evidence that B was +%% ever part of the group can be safely removed from the canonical +%% group membership. +%% +%% Thus, for every message that a member sends, it includes with that +%% message its view version. When a member receives a message it will +%% update its view from the canonical copy, should its view be older +%% than the view version included in the message it has received. +%% +%% The state held by each member therefore includes the messages from +%% each publisher pending acknowledgement, the last publication seen +%% from that publisher, and the last acknowledgement from that +%% publisher. In the case of the member's own publications or +%% inherited members, this last acknowledgement seen state indicates +%% the last acknowledgement retired, rather than sent. +%% +%% +%% Proof sketch +%% ------------ +%% +%% We need to prove that with the provided operational semantics, we +%% can never reach a state that is not well formed from a well-formed +%% starting state. +%% +%% Operational semantics (small step): straight-forward message +%% sending, process monitoring, state updates. +%% +%% Well formed state: dead members inherited by exactly one non-dead +%% member; for every entry in anyone's pending-acks, either (the +%% publication of the message is in-flight downstream from the member +%% and upstream from the publisher) or (the acknowledgement of the +%% message is in-flight downstream from the publisher and upstream +%% from the member). +%% +%% Proof by induction on the applicable operational semantics. +%% +%% +%% Related work +%% ------------ +%% +%% The ring configuration and double traversal of messages around the +%% ring is similar (though developed independently) to the LCR +%% protocol by [Levy 2008]. However, LCR differs in several +%% ways. Firstly, by using vector clocks, it enforces a total order of +%% message delivery, which is unnecessary for our purposes. More +%% significantly, it is built on top of a "group communication system" +%% which performs the group management functions, taking +%% responsibility away from the protocol as to how to cope with safely +%% adding and removing members. When membership changes do occur, the +%% protocol stipulates that every member must perform communication +%% with every other member of the group, to ensure all outstanding +%% deliveries complete, before the entire group transitions to the new +%% view. This, in total, requires two sets of all-to-all synchronous +%% communications. +%% +%% This is not only rather inefficient, but also does not explain what +%% happens upon the failure of a member during this process. It does +%% though entirely avoid the need for inheritance of responsibility of +%% dead members that our protocol incorporates. +%% +%% In [Marandi et al 2010], a Paxos-based protocol is described. This +%% work explicitly focuses on the efficiency of communication. LCR +%% (and our protocol too) are more efficient, but at the cost of +%% higher latency. The Ring-Paxos protocol is itself built on top of +%% IP-multicast, which rules it out for many applications where +%% point-to-point communication is all that can be required. They also +%% have an excellent related work section which I really ought to +%% read... +%% +%% +%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008. +%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast +%% Protocol + + +-behaviour(gen_server2). + +-export([create_tables/0, start_link/3, leave/1, broadcast/2, + confirmed_broadcast/2, group_members/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, prioritise_info/2]). + +-export([behaviour_info/1]). + +-export([table_definitions/0]). + +-define(GROUP_TABLE, gm_group). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). +-define(SETS, ordsets). +-define(DICT, orddict). + +-record(state, + { self, + left, + right, + group_name, + module, + view, + pub_count, + members_state, + callback_args, + confirms + }). + +-record(gm_group, { name, version, members }). + +-record(view_member, { id, aliases, left, right }). + +-record(member, { pending_ack, last_pub, last_ack }). + +-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group}, + {attributes, record_info(fields, gm_group)}]}). +-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}). + +-define(TAG, '$gm'). + +-ifdef(use_specs). + +-export_type([group_name/0]). + +-type(group_name() :: any()). + +-spec(create_tables/0 :: () -> 'ok'). +-spec(start_link/3 :: (group_name(), atom(), any()) -> + {'ok', pid()} | {'error', any()}). +-spec(leave/1 :: (pid()) -> 'ok'). +-spec(broadcast/2 :: (pid(), any()) -> 'ok'). +-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). +-spec(group_members/1 :: (pid()) -> [pid()]). + +-endif. + +behaviour_info(callbacks) -> + [ + %% The joined, members_changed and handle_msg callbacks can all + %% return any of the following terms: + %% + %% 'ok' - the callback function returns normally + %% + %% {'stop', Reason} - the callback indicates the member should + %% stop with reason Reason and should leave the group. + %% + %% {'become', Module, Args} - the callback indicates that the + %% callback module should be changed to Module and that the + %% callback functions should now be passed the arguments + %% Args. This allows the callback module to be dynamically + %% changed. + + %% Called when we've successfully joined the group. Supplied with + %% Args provided in start_link, plus current group members. + {joined, 2}, + + %% Supplied with Args provided in start_link, the list of new + %% members and the list of members previously known to us that + %% have since died. Note that if a member joins and dies very + %% quickly, it's possible that we will never see that member + %% appear in either births or deaths. However we are guaranteed + %% that (1) we will see a member joining either in the births + %% here, or in the members passed to joined/2 before receiving + %% any messages from it; and (2) we will not see members die that + %% we have not seen born (or supplied in the members to + %% joined/2). + {members_changed, 3}, + + %% Supplied with Args provided in start_link, the sender, and the + %% message. This does get called for messages injected by this + %% member, however, in such cases, there is no special + %% significance of this invocation: it does not indicate that the + %% message has made it to any other members, let alone all other + %% members. + {handle_msg, 3}, + + %% Called on gm member termination as per rules in gen_server, + %% with the Args provided in start_link plus the termination + %% Reason. + {terminate, 2} + ]; +behaviour_info(_Other) -> + undefined. + +create_tables() -> + create_tables([?TABLE]). + +create_tables([]) -> + ok; +create_tables([{Table, Attributes} | Tables]) -> + case mnesia:create_table(Table, Attributes) of + {atomic, ok} -> create_tables(Tables); + {aborted, {already_exists, gm_group}} -> create_tables(Tables); + Err -> Err + end. + +table_definitions() -> + {Name, Attributes} = ?TABLE, + [{Name, [?TABLE_MATCH | Attributes]}]. + +start_link(GroupName, Module, Args) -> + gen_server2:start_link(?MODULE, [GroupName, Module, Args], []). + +leave(Server) -> + gen_server2:cast(Server, leave). + +broadcast(Server, Msg) -> + gen_server2:cast(Server, {broadcast, Msg}). + +confirmed_broadcast(Server, Msg) -> + gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). + +group_members(Server) -> + gen_server2:call(Server, group_members, infinity). + + +init([GroupName, Module, Args]) -> + random:seed(now()), + gen_server2:cast(self(), join), + Self = self(), + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = 0, + members_state = undefined, + callback_args = Args, + confirms = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + + +handle_call({confirmed_broadcast, _Msg}, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + +handle_call({confirmed_broadcast, Msg}, _From, + State = #state { self = Self, + right = {Self, undefined}, + module = Module, + callback_args = Args }) -> + handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + +handle_call({confirmed_broadcast, Msg}, From, State) -> + internal_broadcast(Msg, From, State); + +handle_call(group_members, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + +handle_call(group_members, _From, State = #state { view = View }) -> + reply(alive_view_members(View), State); + +handle_call({add_on_right, _NewMember}, _From, + State = #state { members_state = undefined }) -> + reply(not_ready, State); + +handle_call({add_on_right, NewMember}, _From, + State = #state { self = Self, + group_name = GroupName, + view = View, + members_state = MembersState, + module = Module, + callback_args = Args }) -> + Group = record_new_member_in_group( + GroupName, Self, NewMember, + fun (Group1) -> + View1 = group_to_view(Group1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state( + MembersState)}) + end), + View2 = group_to_view(Group), + State1 = check_neighbours(State #state { view = View2 }), + Result = callback_view_changed(Args, Module, View, View2), + handle_callback_result({Result, {ok, Group}, State1}). + + +handle_cast({?TAG, ReqVer, Msg}, + State = #state { view = View, + group_name = GroupName, + module = Module, + callback_args = Args }) -> + {Result, State1} = + case needs_view_update(ReqVer, View) of + true -> + View1 = group_to_view(read_group(GroupName)), + {callback_view_changed(Args, Module, View, View1), + check_neighbours(State #state { view = View1 })}; + false -> + {ok, State} + end, + handle_callback_result( + if_callback_success( + Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); + +handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> + noreply(State); + +handle_cast({broadcast, Msg}, + State = #state { self = Self, + right = {Self, undefined}, + module = Module, + callback_args = Args }) -> + handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + +handle_cast({broadcast, Msg}, State) -> + internal_broadcast(Msg, none, State); + +handle_cast(join, State = #state { self = Self, + group_name = GroupName, + members_state = undefined, + module = Module, + callback_args = Args }) -> + View = join_group(Self, GroupName), + MembersState = + case alive_view_members(View) of + [Self] -> blank_member_state(); + _ -> undefined + end, + State1 = check_neighbours(State #state { view = View, + members_state = MembersState }), + handle_callback_result( + {Module:joined(Args, all_known_members(View)), State1}); + +handle_cast(leave, State) -> + {stop, normal, State}. + + +handle_info({'DOWN', MRef, process, _Pid, _Reason}, + State = #state { self = Self, + left = Left, + right = Right, + group_name = GroupName, + view = View, + module = Module, + callback_args = Args, + confirms = Confirms }) -> + Member = case {Left, Right} of + {{Member1, MRef}, _} -> Member1; + {_, {Member1, MRef}} -> Member1; + _ -> undefined + end, + case Member of + undefined -> + noreply(State); + _ -> + View1 = + group_to_view(record_dead_member_in_group(Member, GroupName)), + State1 = State #state { view = View1 }, + {Result, State2} = + case alive_view_members(View1) of + [Self] -> + maybe_erase_aliases( + State1 #state { + members_state = blank_member_state(), + confirms = purge_confirms(Confirms) }); + _ -> + %% here we won't be pointing out any deaths: + %% the concern is that there maybe births + %% which we'd otherwise miss. + {callback_view_changed(Args, Module, View, View1), + State1} + end, + handle_callback_result({Result, check_neighbours(State2)}) + end. + + +terminate(Reason, #state { module = Module, + callback_args = Args }) -> + Module:terminate(Args, Reason). + + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; +prioritise_info(_ , _State) -> 0. + + +handle_msg(check_neighbours, State) -> + %% no-op - it's already been done by the calling handle_cast + {ok, State}; + +handle_msg({catchup, Left, MembersStateLeft}, + State = #state { self = Self, + left = {Left, _MRefL}, + right = {Right, _MRefR}, + view = View, + members_state = undefined }) -> + ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), + MembersStateLeft1 = build_members_state(MembersStateLeft), + {ok, State #state { members_state = MembersStateLeft1 }}; + +handle_msg({catchup, Left, MembersStateLeft}, + State = #state { self = Self, + left = {Left, _MRefL}, + view = View, + members_state = MembersState }) + when MembersState =/= undefined -> + MembersStateLeft1 = build_members_state(MembersStateLeft), + AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ + ?DICT:fetch_keys(MembersStateLeft1)), + {MembersState1, Activity} = + lists:foldl( + fun (Id, MembersStateActivity) -> + #member { pending_ack = PALeft, last_ack = LA } = + find_member_or_blank(Id, MembersStateLeft1), + with_member_acc( + fun (#member { pending_ack = PA } = Member, Activity1) -> + case is_member_alias(Id, Self, View) of + true -> + {_AcksInFlight, Pubs, _PA1} = + find_prefix_common_suffix(PALeft, PA), + {Member #member { last_ack = LA }, + activity_cons(Id, pubs_from_queue(Pubs), + [], Activity1)}; + false -> + {Acks, _Common, Pubs} = + find_prefix_common_suffix(PA, PALeft), + {Member, + activity_cons(Id, pubs_from_queue(Pubs), + acks_from_queue(Acks), + Activity1)} + end + end, Id, MembersStateActivity) + end, {MembersState, activity_nil()}, AllMembers), + handle_msg({activity, Left, activity_finalise(Activity)}, + State #state { members_state = MembersState1 }); + +handle_msg({catchup, _NotLeft, _MembersState}, State) -> + {ok, State}; + +handle_msg({activity, Left, Activity}, + State = #state { self = Self, + left = {Left, _MRefL}, + view = View, + members_state = MembersState, + confirms = Confirms }) + when MembersState =/= undefined -> + {MembersState1, {Confirms1, Activity1}} = + lists:foldl( + fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> + with_member_acc( + fun (Member = #member { pending_ack = PA, + last_pub = LP, + last_ack = LA }, + {Confirms2, Activity2}) -> + case is_member_alias(Id, Self, View) of + true -> + {ToAck, PA1} = + find_common(queue_from_pubs(Pubs), PA, + queue:new()), + LA1 = last_ack(Acks, LA), + AckNums = acks_from_queue(ToAck), + Confirms3 = maybe_confirm( + Self, Id, Confirms2, AckNums), + {Member #member { pending_ack = PA1, + last_ack = LA1 }, + {Confirms3, + activity_cons( + Id, [], AckNums, Activity2)}}; + false -> + PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), + LA1 = last_ack(Acks, LA), + LP1 = last_pub(Pubs, LP), + {Member #member { pending_ack = PA1, + last_pub = LP1, + last_ack = LA1 }, + {Confirms2, + activity_cons(Id, Pubs, Acks, Activity2)}} + end + end, Id, MembersStateConfirmsActivity) + end, {MembersState, {Confirms, activity_nil()}}, Activity), + State1 = State #state { members_state = MembersState1, + confirms = Confirms1 }, + Activity3 = activity_finalise(Activity1), + {Result, State2} = maybe_erase_aliases(State1), + ok = maybe_send_activity(Activity3, State2), + if_callback_success( + Result, fun activity_true/3, fun activity_false/3, Activity3, State2); + +handle_msg({activity, _NotLeft, _Activity}, State) -> + {ok, State}. + + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. + +internal_broadcast(Msg, From, State = #state { self = Self, + pub_count = PubCount, + members_state = MembersState, + module = Module, + confirms = Confirms, + callback_args = Args }) -> + PubMsg = {PubCount, Msg}, + Activity = activity_cons(Self, [PubMsg], [], activity_nil()), + ok = maybe_send_activity(activity_finalise(Activity), State), + MembersState1 = + with_member( + fun (Member = #member { pending_ack = PA }) -> + Member #member { pending_ack = queue:in(PubMsg, PA) } + end, Self, MembersState), + Confirms1 = case From of + none -> Confirms; + _ -> queue:in({PubCount, From}, Confirms) + end, + handle_callback_result({Module:handle_msg(Args, Self, Msg), + State #state { pub_count = PubCount + 1, + members_state = MembersState1, + confirms = Confirms1 }}). + + +%% --------------------------------------------------------------------------- +%% View construction and inspection +%% --------------------------------------------------------------------------- + +needs_view_update(ReqVer, {Ver, _View}) -> + Ver < ReqVer. + +view_version({Ver, _View}) -> + Ver. + +is_member_alive({dead, _Member}) -> false; +is_member_alive(_) -> true. + +is_member_alias(Self, Self, _View) -> + true; +is_member_alias(Member, Self, View) -> + ?SETS:is_element(Member, + ((fetch_view_member(Self, View)) #view_member.aliases)). + +dead_member_id({dead, Member}) -> Member. + +store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> + {Ver, ?DICT:store(Id, VMember, View)}. + +with_view_member(Fun, View, Id) -> + store_view_member(Fun(fetch_view_member(Id, View)), View). + +fetch_view_member(Id, {_Ver, View}) -> + ?DICT:fetch(Id, View). + +find_view_member(Id, {_Ver, View}) -> + ?DICT:find(Id, View). + +blank_view(Ver) -> + {Ver, ?DICT:new()}. + +alive_view_members({_Ver, View}) -> + ?DICT:fetch_keys(View). + +all_known_members({_Ver, View}) -> + ?DICT:fold( + fun (Member, #view_member { aliases = Aliases }, Acc) -> + ?SETS:to_list(Aliases) ++ [Member | Acc] + end, [], View). + +group_to_view(#gm_group { members = Members, version = Ver }) -> + Alive = lists:filter(fun is_member_alive/1, Members), + [_|_] = Alive, %% ASSERTION - can't have all dead members + add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members). + +link_view([Left, Middle, Right | Rest], View) -> + case find_view_member(Middle, View) of + error -> + link_view( + [Middle, Right | Rest], + store_view_member(#view_member { id = Middle, + aliases = ?SETS:new(), + left = Left, + right = Right }, View)); + {ok, _} -> + View + end; +link_view(_, View) -> + View. + +add_aliases(View, Members) -> + Members1 = ensure_alive_suffix(Members), + {EmptyDeadSet, View1} = + lists:foldl( + fun (Member, {DeadAcc, ViewAcc}) -> + case is_member_alive(Member) of + true -> + {?SETS:new(), + with_view_member( + fun (VMember = + #view_member { aliases = Aliases }) -> + VMember #view_member { + aliases = ?SETS:union(Aliases, DeadAcc) } + end, ViewAcc, Member)}; + false -> + {?SETS:add_element(dead_member_id(Member), DeadAcc), + ViewAcc} + end + end, {?SETS:new(), View}, Members1), + 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION + View1. + +ensure_alive_suffix(Members) -> + queue:to_list(ensure_alive_suffix1(queue:from_list(Members))). + +ensure_alive_suffix1(MembersQ) -> + {{value, Member}, MembersQ1} = queue:out_r(MembersQ), + case is_member_alive(Member) of + true -> MembersQ; + false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1)) + end. + + +%% --------------------------------------------------------------------------- +%% View modification +%% --------------------------------------------------------------------------- + +join_group(Self, GroupName) -> + join_group(Self, GroupName, read_group(GroupName)). + +join_group(Self, GroupName, {error, not_found}) -> + join_group(Self, GroupName, prune_or_create_group(Self, GroupName)); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) -> + group_to_view(Group); +join_group(Self, GroupName, #gm_group { members = Members } = Group) -> + case lists:member(Self, Members) of + true -> + group_to_view(Group); + false -> + case lists:filter(fun is_member_alive/1, Members) of + [] -> + join_group(Self, GroupName, + prune_or_create_group(Self, GroupName)); + Alive -> + Left = lists:nth(random:uniform(length(Alive)), Alive), + try + case gen_server2:call( + Left, {add_on_right, Self}, infinity) of + {ok, Group1} -> group_to_view(Group1); + not_ready -> join_group(Self, GroupName) + end + catch + exit:{R, _} + when R =:= noproc; R =:= normal; R =:= shutdown -> + join_group( + Self, GroupName, + record_dead_member_in_group(Left, GroupName)) + end + end + end. + +read_group(GroupName) -> + case mnesia:dirty_read(?GROUP_TABLE, GroupName) of + [] -> {error, not_found}; + [Group] -> Group + end. + +prune_or_create_group(Self, GroupName) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> GroupNew = #gm_group { name = GroupName, + members = [Self], + version = 0 }, + case mnesia:read(?GROUP_TABLE, GroupName) of + [] -> + mnesia:write(GroupNew), + GroupNew; + [Group1 = #gm_group { members = Members }] -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group1; + false -> mnesia:write(GroupNew), + GroupNew + end + end + end), + Group. + +record_dead_member_in_group(Member, GroupName) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = + mnesia:read(?GROUP_TABLE, GroupName), + case lists:splitwith( + fun (Member1) -> Member1 =/= Member end, Members) of + {_Members1, []} -> %% not found - already recorded dead + Group1; + {Members1, [Member | Members2]} -> + Members3 = Members1 ++ [{dead, Member} | Members2], + Group2 = Group1 #gm_group { members = Members3, + version = Ver + 1 }, + mnesia:write(Group2), + Group2 + end + end), + Group. + +record_new_member_in_group(GroupName, Left, NewMember, Fun) -> + {atomic, Group} = + mnesia:sync_transaction( + fun () -> + [#gm_group { members = Members, version = Ver } = Group1] = + mnesia:read(?GROUP_TABLE, GroupName), + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + Members1 = Prefix ++ [Left, NewMember | Suffix], + Group2 = Group1 #gm_group { members = Members1, + version = Ver + 1 }, + ok = Fun(Group2), + mnesia:write(Group2), + Group2 + end), + Group. + +erase_members_in_group(Members, GroupName) -> + DeadMembers = [{dead, Id} || Id <- Members], + {atomic, Group} = + mnesia:sync_transaction( + fun () -> + [Group1 = #gm_group { members = [_|_] = Members1, + version = Ver }] = + mnesia:read(?GROUP_TABLE, GroupName), + case Members1 -- DeadMembers of + Members1 -> Group1; + Members2 -> Group2 = + Group1 #gm_group { members = Members2, + version = Ver + 1 }, + mnesia:write(Group2), + Group2 + end + end), + Group. + +maybe_erase_aliases(State = #state { self = Self, + group_name = GroupName, + view = View, + members_state = MembersState, + module = Module, + callback_args = Args }) -> + #view_member { aliases = Aliases } = fetch_view_member(Self, View), + {Erasable, MembersState1} + = ?SETS:fold( + fun (Id, {ErasableAcc, MembersStateAcc} = Acc) -> + #member { last_pub = LP, last_ack = LA } = + find_member_or_blank(Id, MembersState), + case can_erase_view_member(Self, Id, LA, LP) of + true -> {[Id | ErasableAcc], + erase_member(Id, MembersStateAcc)}; + false -> Acc + end + end, {[], MembersState}, Aliases), + State1 = State #state { members_state = MembersState1 }, + case Erasable of + [] -> {ok, State1}; + _ -> View1 = group_to_view( + erase_members_in_group(Erasable, GroupName)), + {callback_view_changed(Args, Module, View, View1), + State1 #state { view = View1 }} + end. + +can_erase_view_member(Self, Self, _LA, _LP) -> false; +can_erase_view_member(_Self, _Id, N, N) -> true; +can_erase_view_member(_Self, _Id, _LA, _LP) -> false. + + +%% --------------------------------------------------------------------------- +%% View monitoring and maintanence +%% --------------------------------------------------------------------------- + +ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> + {Self, undefined}; +ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> + ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; +ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> + {RealNeighbour, MRef}; +ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> + true = erlang:demonitor(MRef), + Msg = {?TAG, Ver, check_neighbours}, + ok = gen_server2:cast(RealNeighbour, Msg), + ok = case Neighbour of + Self -> ok; + _ -> gen_server2:cast(Neighbour, Msg) + end, + {Neighbour, maybe_monitor(Neighbour, Self)}. + +maybe_monitor(Self, Self) -> + undefined; +maybe_monitor(Other, _Self) -> + erlang:monitor(process, Other). + +check_neighbours(State = #state { self = Self, + left = Left, + right = Right, + view = View }) -> + #view_member { left = VLeft, right = VRight } + = fetch_view_member(Self, View), + Ver = view_version(View), + Left1 = ensure_neighbour(Ver, Self, Left, VLeft), + Right1 = ensure_neighbour(Ver, Self, Right, VRight), + State1 = State #state { left = Left1, right = Right1 }, + ok = maybe_send_catchup(Right, State1), + State1. + +maybe_send_catchup(Right, #state { right = Right }) -> + ok; +maybe_send_catchup(_Right, #state { self = Self, + right = {Self, undefined} }) -> + ok; +maybe_send_catchup(_Right, #state { members_state = undefined }) -> + ok; +maybe_send_catchup(_Right, #state { self = Self, + right = {Right, _MRef}, + view = View, + members_state = MembersState }) -> + send_right(Right, View, + {catchup, Self, prepare_members_state(MembersState)}). + + +%% --------------------------------------------------------------------------- +%% Catch_up delta detection +%% --------------------------------------------------------------------------- + +find_prefix_common_suffix(A, B) -> + {Prefix, A1} = find_prefix(A, B, queue:new()), + {Common, Suffix} = find_common(A1, B, queue:new()), + {Prefix, Common, Suffix}. + +%% Returns the elements of A that occur before the first element of B, +%% plus the remainder of A. +find_prefix(A, B, Prefix) -> + case {queue:out(A), queue:out(B)} of + {{{value, Val}, _A1}, {{value, Val}, _B1}} -> + {Prefix, A}; + {{empty, A1}, {{value, _A}, _B1}} -> + {Prefix, A1}; + {{{value, {NumA, _MsgA} = Val}, A1}, + {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB -> + find_prefix(A1, B, queue:in(Val, Prefix)); + {_, {empty, _B1}} -> + {A, Prefix} %% Prefix well be empty here + end. + +%% A should be a prefix of B. Returns the commonality plus the +%% remainder of B. +find_common(A, B, Common) -> + case {queue:out(A), queue:out(B)} of + {{{value, Val}, A1}, {{value, Val}, B1}} -> + find_common(A1, B1, queue:in(Val, Common)); + {{empty, _A}, _} -> + {Common, B} + end. + + +%% --------------------------------------------------------------------------- +%% Members helpers +%% --------------------------------------------------------------------------- + +with_member(Fun, Id, MembersState) -> + store_member( + Id, Fun(find_member_or_blank(Id, MembersState)), MembersState). + +with_member_acc(Fun, Id, {MembersState, Acc}) -> + {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc), + {store_member(Id, MemberState, MembersState), Acc1}. + +find_member_or_blank(Id, MembersState) -> + case ?DICT:find(Id, MembersState) of + {ok, Result} -> Result; + error -> blank_member() + end. + +erase_member(Id, MembersState) -> + ?DICT:erase(Id, MembersState). + +blank_member() -> + #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. + +blank_member_state() -> + ?DICT:new(). + +store_member(Id, MemberState, MembersState) -> + ?DICT:store(Id, MemberState, MembersState). + +prepare_members_state(MembersState) -> + ?DICT:to_list(MembersState). + +build_members_state(MembersStateList) -> + ?DICT:from_list(MembersStateList). + + +%% --------------------------------------------------------------------------- +%% Activity assembly +%% --------------------------------------------------------------------------- + +activity_nil() -> + queue:new(). + +activity_cons(_Id, [], [], Tail) -> + Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> + queue:in({Sender, Pubs, Acks}, Tail). + +activity_finalise(Activity) -> + queue:to_list(Activity). + +maybe_send_activity([], _State) -> + ok; +maybe_send_activity(Activity, #state { self = Self, + right = {Right, _MRefR}, + view = View }) -> + send_right(Right, View, {activity, Self, Activity}). + +send_right(Right, View, Msg) -> + ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + +callback(Args, Module, Activity) -> + lists:foldl( + fun ({Id, Pubs, _Acks}, ok) -> + lists:foldl(fun ({_PubNum, Pub}, ok) -> + Module:handle_msg(Args, Id, Pub); + (_, Error) -> + Error + end, ok, Pubs); + (_, Error) -> + Error + end, ok, Activity). + +callback_view_changed(Args, Module, OldView, NewView) -> + OldMembers = all_known_members(OldView), + NewMembers = all_known_members(NewView), + Births = NewMembers -- OldMembers, + Deaths = OldMembers -- NewMembers, + case {Births, Deaths} of + {[], []} -> ok; + _ -> Module:members_changed(Args, Births, Deaths) + end. + +handle_callback_result({Result, State}) -> + if_callback_success( + Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); +handle_callback_result({Result, Reply, State}) -> + if_callback_success( + Result, fun reply_true/3, fun reply_false/3, Reply, State). + +no_reply_true (_Result, _Undefined, State) -> noreply(State). +no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. + +reply_true (_Result, Reply, State) -> reply(Reply, State). +reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. + +handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). +handle_msg_false(Result, _Msg, State) -> {Result, State}. + +activity_true(_Result, Activity, State = #state { module = Module, + callback_args = Args }) -> + {callback(Args, Module, Activity), State}. +activity_false(Result, _Activity, State) -> + {Result, State}. + +if_callback_success(ok, True, _False, Arg, State) -> + True(ok, Arg, State); +if_callback_success( + {become, Module, Args} = Result, True, _False, Arg, State) -> + True(Result, Arg, State #state { module = Module, + callback_args = Args }); +if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> + False(Result, Arg, State). + +maybe_confirm(_Self, _Id, Confirms, []) -> + Confirms; +maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) -> + case queue:out(Confirms) of + {empty, _Confirms} -> + Confirms; + {{value, {PubNum, From}}, Confirms1} -> + gen_server2:reply(From, ok), + maybe_confirm(Self, Self, Confirms1, PubNums); + {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum -> + maybe_confirm(Self, Self, Confirms, PubNums) + end; +maybe_confirm(_Self, _Id, Confirms, _PubNums) -> + Confirms. + +purge_confirms(Confirms) -> + [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)], + queue:new(). + + +%% --------------------------------------------------------------------------- +%% Msg transformation +%% --------------------------------------------------------------------------- + +acks_from_queue(Q) -> + [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. + +pubs_from_queue(Q) -> + queue:to_list(Q). + +queue_from_pubs(Pubs) -> + queue:from_list(Pubs). + +apply_acks([], Pubs) -> + Pubs; +apply_acks(List, Pubs) -> + {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. + +join_pubs(Q, []) -> Q; +join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). + +last_ack([], LA) -> + LA; +last_ack(List, LA) -> + LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub([], LP) -> + LP; +last_pub(List, LP) -> + {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl new file mode 100644 index 0000000000..1f8832a6b2 --- /dev/null +++ b/src/gm_soak_test.erl @@ -0,0 +1,130 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_soak_test). + +-export([test/0]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +%% --------------------------------------------------------------------------- +%% Soak test +%% --------------------------------------------------------------------------- + +get_state() -> + get(state). + +with_state(Fun) -> + put(state, Fun(get_state())). + +inc() -> + case 1 + get(count) of + 100000 -> Now = os:timestamp(), + Start = put(ts, Now), + Diff = timer:now_diff(Now, Start), + Rate = 100000 / (Diff / 1000000), + io:format("~p seeing ~p msgs/sec~n", [self(), Rate]), + put(count, 0); + N -> put(count, N) + end. + +joined([], Members) -> + io:format("Joined ~p (~p members)~n", [self(), length(Members)]), + put(state, dict:from_list([{Member, empty} || Member <- Members])), + put(count, 0), + put(ts, os:timestamp()), + ok. + +members_changed([], Births, Deaths) -> + with_state( + fun (State) -> + State1 = + lists:foldl( + fun (Born, StateN) -> + false = dict:is_key(Born, StateN), + dict:store(Born, empty, StateN) + end, State, Births), + lists:foldl( + fun (Died, StateN) -> + true = dict:is_key(Died, StateN), + dict:store(Died, died, StateN) + end, State1, Deaths) + end), + ok. + +handle_msg([], From, {test_msg, Num}) -> + inc(), + with_state( + fun (State) -> + ok = case dict:find(From, State) of + {ok, died} -> + exit({{from, From}, + {received_posthumous_delivery, Num}}); + {ok, empty} -> ok; + {ok, Num} -> ok; + {ok, Num1} when Num < Num1 -> + exit({{from, From}, + {duplicate_delivery_of, Num1}, + {expecting, Num}}); + {ok, Num1} -> + exit({{from, From}, + {missing_delivery_of, Num}, + {received_early, Num1}}); + error -> + exit({{from, From}, + {received_premature_delivery, Num}}) + end, + dict:store(From, Num + 1, State) + end), + ok. + +terminate([], Reason) -> + io:format("Left ~p (~p)~n", [self(), Reason]), + ok. + +spawn_member() -> + spawn_link( + fun () -> + random:seed(now()), + %% start up delay of no more than 10 seconds + timer:sleep(random:uniform(10000)), + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []), + Start = random:uniform(10000), + send_loop(Pid, Start, Start + random:uniform(10000)), + gm:leave(Pid), + spawn_more() + end). + +spawn_more() -> + [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))]. + +send_loop(_Pid, Target, Target) -> + ok; +send_loop(Pid, Count, Target) when Target > Count -> + case random:uniform(3) of + 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count}); + _ -> gm:broadcast(Pid, {test_msg, Count}) + end, + timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms + send_loop(Pid, Count + 1, Target). + +test() -> + ok = gm:create_tables(), + spawn_member(), + spawn_member(). diff --git a/src/gm_tests.erl b/src/gm_tests.erl new file mode 100644 index 0000000000..ca0ffd6483 --- /dev/null +++ b/src/gm_tests.erl @@ -0,0 +1,182 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_tests). + +-export([test_join_leave/0, + test_broadcast/0, + test_confirmed_broadcast/0, + test_member_death/0, + test_receive_in_order/0, + all_tests/0]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +-define(RECEIVE_OR_THROW(Body, Bool, Error), + receive Body -> + true = Bool, + passed + after 1000 -> + throw(Error) + end). + +joined(Pid, Members) -> + Pid ! {joined, self(), Members}, + ok. + +members_changed(Pid, Births, Deaths) -> + Pid ! {members_changed, self(), Births, Deaths}, + ok. + +handle_msg(Pid, From, Msg) -> + Pid ! {msg, self(), From, Msg}, + ok. + +terminate(Pid, Reason) -> + Pid ! {termination, self(), Reason}, + ok. + +%% --------------------------------------------------------------------------- +%% Functional tests +%% --------------------------------------------------------------------------- + +all_tests() -> + passed = test_join_leave(), + passed = test_broadcast(), + passed = test_confirmed_broadcast(), + passed = test_member_death(), + passed = test_receive_in_order(), + passed. + +test_join_leave() -> + with_two_members(fun (_Pid, _Pid2) -> passed end). + +test_broadcast() -> + test_broadcast(fun gm:broadcast/2). + +test_confirmed_broadcast() -> + test_broadcast(fun gm:confirmed_broadcast/2). + +test_member_death() -> + with_two_members( + fun (Pid, Pid2) -> + {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()), + passed = receive_joined(Pid3, [Pid, Pid2, Pid3], + timeout_joining_gm_group_3), + passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1), + passed = receive_birth(Pid2, Pid3, timeout_waiting_for_birth_3_2), + + unlink(Pid3), + exit(Pid3, kill), + + %% Have to do some broadcasts to ensure that all members + %% find out about the death. + passed = (test_broadcast_fun(fun gm:confirmed_broadcast/2))( + Pid, Pid2), + + passed = receive_death(Pid, Pid3, timeout_waiting_for_death_3_1), + passed = receive_death(Pid2, Pid3, timeout_waiting_for_death_3_2), + + passed + end). + +test_receive_in_order() -> + with_two_members( + fun (Pid, Pid2) -> + Numbers = lists:seq(1,1000), + [begin ok = gm:broadcast(Pid, N), ok = gm:broadcast(Pid2, N) end + || N <- Numbers], + passed = receive_numbers( + Pid, Pid, {timeout_for_msgs, Pid, Pid}, Numbers), + passed = receive_numbers( + Pid, Pid2, {timeout_for_msgs, Pid, Pid2}, Numbers), + passed = receive_numbers( + Pid2, Pid, {timeout_for_msgs, Pid2, Pid}, Numbers), + passed = receive_numbers( + Pid2, Pid2, {timeout_for_msgs, Pid2, Pid2}, Numbers), + passed + end). + +test_broadcast(Fun) -> + with_two_members(test_broadcast_fun(Fun)). + +test_broadcast_fun(Fun) -> + fun (Pid, Pid2) -> + ok = Fun(Pid, magic_message), + passed = receive_or_throw({msg, Pid, Pid, magic_message}, + timeout_waiting_for_msg), + passed = receive_or_throw({msg, Pid2, Pid, magic_message}, + timeout_waiting_for_msg) + end. + +with_two_members(Fun) -> + ok = gm:create_tables(), + + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1), + + {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()), + passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2), + passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2), + + passed = Fun(Pid, Pid2), + + ok = gm:leave(Pid), + passed = receive_death(Pid2, Pid, timeout_waiting_for_death_1), + passed = + receive_termination(Pid, normal, timeout_waiting_for_termination_1), + + ok = gm:leave(Pid2), + passed = + receive_termination(Pid2, normal, timeout_waiting_for_termination_2), + + receive X -> throw({unexpected_message, X}) + after 0 -> passed + end. + +receive_or_throw(Pattern, Error) -> + ?RECEIVE_OR_THROW(Pattern, true, Error). + +receive_birth(From, Born, Error) -> + ?RECEIVE_OR_THROW({members_changed, From, Birth, Death}, + ([Born] == Birth) andalso ([] == Death), + Error). + +receive_death(From, Died, Error) -> + ?RECEIVE_OR_THROW({members_changed, From, Birth, Death}, + ([] == Birth) andalso ([Died] == Death), + Error). + +receive_joined(From, Members, Error) -> + ?RECEIVE_OR_THROW({joined, From, Members1}, + lists:usort(Members) == lists:usort(Members1), + Error). + +receive_termination(From, Reason, Error) -> + ?RECEIVE_OR_THROW({termination, From, Reason1}, + Reason == Reason1, + Error). + +receive_numbers(_Pid, _Sender, _Error, []) -> + passed; +receive_numbers(Pid, Sender, Error, [N | Numbers]) -> + ?RECEIVE_OR_THROW({msg, Pid, Sender, M}, + M == N, + Error), + receive_numbers(Pid, Sender, Error, Numbers). diff --git a/src/pg_local.erl b/src/pg_local.erl index fd515747e5..c9c3a3a715 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -83,7 +83,7 @@ get_members(Name) -> sync() -> ensure_started(), - gen_server:call(?MODULE, sync). + gen_server:call(?MODULE, sync, infinity). %%% %%% Callback functions from gen_server diff --git a/src/rabbit.erl b/src/rabbit.erl index 1beed5c1a7..c9a929ae00 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -38,6 +38,7 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, + {requires, file_handle_cache}, {enables, external_infrastructure}]}). -rabbit_boot_step({file_handle_cache, @@ -214,7 +215,8 @@ stop_and_halt() -> ok. status() -> - [{running_applications, application:which_applications()}] ++ + [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications()}] ++ rabbit_mnesia:status(). rotate_logs(BinarySuffix) -> @@ -373,7 +375,7 @@ config_files() -> error -> [] end. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- print_banner() -> {ok, Product} = application:get_key(id), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 37e40981a6..d38ecb91fe 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,12 +18,14 @@ -behaviour(gen_event). --export([start/0, stop/0, register/2]). +-export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --record(alarms, {alertees, vm_memory_high_watermark = false}). +-export([remote_conserve_memory/2]). %% Internal use only + +-record(alarms, {alertees, alarmed_nodes}). %%---------------------------------------------------------------------------- @@ -33,6 +35,8 @@ -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_down/1 :: (node()) -> 'ok'). -endif. @@ -56,39 +60,57 @@ register(Pid, HighMemMFA) -> {register, Pid, HighMemMFA}, infinity). +on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}). + +on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). + +%% Can't use alarm_handler:{set,clear}_alarm because that doesn't +%% permit notifying a remote node. +remote_conserve_memory(Pid, true) -> + gen_event:notify({alarm_handler, node(Pid)}, + {set_alarm, {{vm_memory_high_watermark, node()}, []}}); +remote_conserve_memory(Pid, false) -> + gen_event:notify({alarm_handler, node(Pid)}, + {clear_alarm, {vm_memory_high_watermark, node()}}). + %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new()}}. + {ok, #alarms{alertees = dict:new(), + alarmed_nodes = sets:new()}}. -handle_call({register, Pid, {M, F, A} = HighMemMFA}, - State = #alarms{alertees = Alertess}) -> - _MRef = erlang:monitor(process, Pid), - ok = case State#alarms.vm_memory_high_watermark of - true -> apply(M, F, A ++ [Pid, true]); - false -> ok - end, - NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, State#alarms.vm_memory_high_watermark, - State#alarms{alertees = NewAlertees}}; +handle_call({register, Pid, HighMemMFA}, State) -> + {ok, 0 < sets:size(State#alarms.alarmed_nodes), + internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) -> - ok = alert(true, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = true}}; +handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> + {ok, maybe_alert(fun sets:add_element/2, Node, State)}; -handle_event({clear_alarm, vm_memory_high_watermark}, State) -> - ok = alert(false, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = false}}; +handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> + {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + +handle_event({node_up, Node}, State) -> + %% Must do this via notify and not call to avoid possible deadlock. + ok = gen_event:notify( + {alarm_handler, Node}, + {register, self(), {?MODULE, remote_conserve_memory, []}}), + {ok, State}; + +handle_event({node_down, Node}, State) -> + {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + +handle_event({register, Pid, HighMemMFA}, State) -> + {ok, internal_register(Pid, HighMemMFA, State)}; handle_event(_Event, State) -> {ok, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #alarms{alertees = Alertess}) -> - {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + State = #alarms{alertees = Alertees}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; handle_info(_Info, State) -> {ok, State}. @@ -100,10 +122,45 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -alert(_Alert, undefined) -> - ok; -alert(Alert, Alertees) -> - dict:fold(fun (Pid, {M, F, A}, Acc) -> - ok = erlang:apply(M, F, A ++ [Pid, Alert]), - Acc + +maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = SetFun(Node, AN), + BeforeSz = sets:size(AN), + AfterSz = sets:size(AN1), + %% If we have changed our alarm state, inform the remotes. + IsLocal = Node =:= node(), + if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); + IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); + true -> ok + end, + %% If the overall alarm state has changed, inform the locals. + case {BeforeSz, AfterSz} of + {0, 1} -> ok = alert_local(true, Alertees); + {1, 0} -> ok = alert_local(false, Alertees); + {_, _} -> ok + end, + State#alarms{alarmed_nodes = AN1}. + +alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). + +alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). + +alert(Alert, Alertees, NodeComparator) -> + Node = node(), + dict:fold(fun (Pid, {M, F, A}, ok) -> + case NodeComparator(Node, node(Pid)) of + true -> apply(M, F, A ++ [Pid, Alert]); + false -> ok + end end, ok, Alertees). + +internal_register(Pid, {M, F, A} = HighMemMFA, + State = #alarms{alertees = Alertees}) -> + _MRef = erlang:monitor(process, Pid), + case sets:is_element(node(), State#alarms.alarmed_nodes) of + true -> ok = apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertees), + State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1c89539fc0..8e4ca8e3c5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -52,7 +52,7 @@ -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}). -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: - 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). @@ -100,13 +100,13 @@ -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(delete/3 :: - (rabbit_types:amqqueue(), 'false', 'false') + (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); - (rabbit_types:amqqueue(), 'true' , 'false') + (rabbit_types:amqqueue(), 'true' , 'false') -> qlen() | rabbit_types:error('in_use'); - (rabbit_types:amqqueue(), 'false', 'true' ) + (rabbit_types:amqqueue(), 'false', 'true' ) -> qlen() | rabbit_types:error('not_empty'); - (rabbit_types:amqqueue(), 'true' , 'true' ) + (rabbit_types:amqqueue(), 'true' , 'true' ) -> qlen() | rabbit_types:error('in_use') | rabbit_types:error('not_empty')). @@ -122,10 +122,10 @@ -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), qmsg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/7 :: - (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', - rabbit_types:ctag(), boolean(), any()) + (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined', + rabbit_types:ctag(), boolean(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> arguments = Args, exclusive_owner = Owner, pid = none}), - case gen_server2:call(Q#amqqueue.pid, {init, false}) of + case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 end. @@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [] -> ok = store_queue(Q), B = add_default_binding(Q), fun (Tx) -> B(Tx), Q end; - [_] -> %% Q exists on stopped node - rabbit_misc:const(not_found) + %% Q exists on stopped node + [_] -> rabbit_misc:const(not_found) end; [ExistingQ = #amqqueue{pid = QPid}] -> case rabbit_misc:is_process_alive(QPid) of @@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, - RequiredArgs) -> + RequiredArgs) -> rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [<<"x-expires">>]). @@ -324,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, info, infinity). + delegate_call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -337,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, consumers, infinity). + delegate_call(QPid, consumers). consumers_all(VHostPath) -> lists:append( @@ -347,7 +347,7 @@ consumers_all(VHostPath) -> end)). stat(#amqqueue{pid = QPid}) -> - delegate_call(QPid, stat, infinity). + delegate_call(QPid, stat). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -356,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). deliver(QPid, Delivery = #delivery{immediate = true}) -> gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); @@ -370,7 +370,7 @@ deliver(QPid, Delivery) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). + delegate_call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). @@ -399,17 +399,15 @@ limit_all(QPids, ChPid, LimiterPid) -> end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - infinity). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). @@ -500,8 +498,8 @@ safe_delegate_call_ok(F, Pids) -> {_, Bad} -> {error, Bad} end. -delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). +delegate_call(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). delegate_cast(Pid, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e794b4aa1e..24de941595 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). -% Queue's state +%% Queue's state -record(q, {q, exclusive_consumer, has_had_consumers, @@ -283,17 +283,16 @@ lookup_ch(ChPid) -> ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of - undefined -> - MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = sets:new(), - is_limit_active = false, - txn = none, - unsent_message_count = 0}, - put(Key, C), - C; + undefined -> MonitorRef = erlang:monitor(process, ChPid), + C = #cr{consumer_count = 0, + ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = sets:new(), + is_limit_active = false, + txn = none, + unsent_message_count = 0}, + put(Key, C), + C; C = #cr{} -> C end. @@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -all_ch_record() -> - [C || {{ch, _}, C} <- get()]. +all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> - BlockedOld = is_ch_blocked(OldCR), - BlockedNew = is_ch_blocked(NewCR), - if BlockedOld andalso not(BlockedNew) -> unblock; - BlockedNew andalso not(BlockedOld) -> block; - true -> ok + case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of + {true, false} -> unblock; + {false, true} -> block; + {_, _} -> ok end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, @@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), BlockedConsumers}; - block -> - {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + block -> {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, State2 = State1#q{ active_consumers = NewActiveConsumers, @@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end. -deliver_from_queue_pred(IsEmpty, _State) -> - not IsEmpty. +deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = @@ -405,32 +400,26 @@ deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = - lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)}; - _ -> - {CMs, GTC0} - end - end, {[], GTC}, Guids), - case lists:usort(CMs) of - [{Ch, MsgSeqNo} | CMs1] -> - [rabbit_channel:confirm(ChPid, MsgSeqNos) || - {ChPid, MsgSeqNos} <- group_confirms_by_channel( - CMs1, [{Ch, [MsgSeqNo]}])]; - [] -> - ok - end, + {CMs, GTC1} = lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {gb_trees:empty(), GTC}, Guids), + gb_trees:map(fun(ChPid, MsgSeqNos) -> + rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), State#q{guid_to_channel = GTC1}. -group_confirms_by_channel([], Acc) -> - Acc; -group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) -> - group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]); -group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) -> - group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]). +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> {no_confirm, State}; @@ -485,17 +474,14 @@ attempt_delivery(#delivery{txn = none, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), {Delivered, NeedsConfirming, State1}; -attempt_delivery(#delivery{txn = Txn, +attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - {NeedsConfirming, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - {true, - NeedsConfirming, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -666,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> - Now > Expiry - end, BQS), + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -727,10 +712,10 @@ i(Item, _) -> consumers(#q{active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers}) -> rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)). + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). emit_stats(State) -> emit_stats(State, []). @@ -752,7 +737,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> {channel, ChPid}, {queue, self()}]). -%--------------------------------------------------------------------------- +%%---------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> case Msg of @@ -819,8 +804,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, - _From, State) -> +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -911,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, case is_ch_blocked(C) of true -> State1#q{ blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; + add_consumer(ChPid, Consumer, + State1#q.blocked_consumers)}; false -> run_message_queue( State1#q{ active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) + add_consumer(ChPid, Consumer, + State1#q.active_consumers)}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index a564480b0e..3d00584551 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -52,8 +52,8 @@ -spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok'). -spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]). -spec(lookup_user/1 :: (rabbit_types:username()) - -> rabbit_types:ok(rabbit_types:internal_user()) - | rabbit_types:error('not_found')). + -> rabbit_types:ok(rabbit_types:internal_user()) + | rabbit_types:error('not_found')). -spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost()) diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 1d14f9f0b8..897199ee78 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -23,6 +23,10 @@ behaviour_info(callbacks) -> %% A description. {description, 0}, + %% If this mechanism is enabled, should it be offered for a given socket? + %% (primarily so EXTERNAL can be SSL-only) + {should_offer, 1}, + %% Called before authentication starts. Should create a state %% object to be passed through all the stages of authentication. {init, 1}, diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index 5e422eee89..b8682a465e 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -38,6 +38,9 @@ description() -> [{name, <<"AMQPLAIN">>}, {description, <<"QPid AMQPLAIN mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> []. @@ -51,5 +54,5 @@ handle_response(Response, _State) -> _ -> {protocol_error, "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field", - [LoginTable]} + [LoginTable]} end. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index 7fd20f8b32..77aa34ea0a 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -43,6 +43,9 @@ description() -> {description, <<"RabbitMQ Demo challenge-response authentication " "mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> #state{}. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 1ca07018e4..e2f9bff9c5 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -19,7 +19,7 @@ -behaviour(rabbit_auth_mechanism). --export([description/0, init/1, handle_response/2]). +-export([description/0, should_offer/1, init/1, handle_response/2]). -include("rabbit_auth_mechanism_spec.hrl"). @@ -41,6 +41,9 @@ description() -> [{name, <<"PLAIN">>}, {description, <<"SASL PLAIN authentication mechanism">>}]. +should_offer(_Sock) -> + true. + init(_Sock) -> []. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index c5bd9575e3..f9a8ee1d50 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,10 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/5]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -41,8 +40,11 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -56,9 +58,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -98,19 +97,40 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + guid = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + {ok, Msg} = message(ExchangeName, RoutingKey, Content), + Msg. properties(P = #'P_basic'{}) -> P; @@ -152,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> throw({error, {delivery_mode_unknown, Other}}) end. + +%% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + Type, + binary_to_list(HeaderKey)}}) + end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index dc81ace6bf..68511a326c 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -18,12 +18,13 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). -% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 -% - 1 byte of frame type -% - 2 bytes of channel number -% - 4 bytes of frame payload length -% - 1 byte of payload trailer FRAME_END byte -% See definition of check_empty_content_body_frame_size/0, an assertion called at startup. +%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 +%% - 1 byte of frame type +%% - 2 bytes of channel number +%% - 4 bytes of frame payload length +%% - 1 byte of payload trailer FRAME_END byte +%% See definition of check_empty_content_body_frame_size/0, +%% an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). -export([build_simple_method_frame/3, diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 96a22dcaf1..7ddb781412 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -70,7 +70,7 @@ rabbit_types:infos()). -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) - -> [rabbit_types:infos()]). + -> [rabbit_types:infos()]). -spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()). -spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()). -spec(remove_for_destination/1 :: diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4ffd7822e..526fb42881 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -68,9 +68,9 @@ -type(channel_number() :: non_neg_integer()). -spec(start_link/9 :: - (channel_number(), pid(), pid(), rabbit_types:protocol(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -109,7 +109,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). flush(Pid) -> - gen_server2:call(Pid, flush). + gen_server2:call(Pid, flush, infinity). shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -254,7 +254,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> @@ -301,8 +301,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, {MXs, State2} = process_confirms(MsgSeqNos, QPid, State1), erase_queue_stats(QPid), State3 = (case Reason of - normal -> fun record_confirms/2; - _ -> fun send_nacks/2 + normal -> fun record_confirms/2; + _ -> fun send_nacks/2 end)(MXs, State2), noreply(queue_blocked(QPid, State3)). @@ -593,32 +593,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), - IsPersistent = is_message_persistent(DecodedContent), {MsgSeqNo, State1} = case ConfirmEnabled of false -> {undefined, State}; true -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, - {RoutingRes, DeliveredQPids} = - rabbit_exchange:publish( - Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - MsgSeqNo)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, - MsgSeqNo, Message, State1), - maybe_incr_stats([{ExchangeName, 1} | - [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State2), - {noreply, case TxnKey of - none -> State2; - _ -> add_tx_participants(DeliveredQPids, State2) - end}; + case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of + {ok, Message} -> + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + ExchangeName, MsgSeqNo, Message, + State1), + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State2), + {noreply, case TxnKey of + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) + end}; + {error, Reason} -> + rabbit_misc:protocol_error(precondition_failed, + "invalid message: ~p", [Reason]) + end; handle_method(#'basic.nack'{delivery_tag = DeliveryTag, multiple = Multiple, @@ -658,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), @@ -714,9 +715,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of ok -> {noreply, State#ch{consumer_mapping = - dict:store(ActualConsumerTag, - QueueName, - ConsumerMapping)}}; + dict:store(ActualConsumerTag, + QueueName, + ConsumerMapping)}}; {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -738,8 +739,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, return_ok(State, NoWait, OkMsg); {ok, QueueName} -> NewState = State#ch{consumer_mapping = - dict:erase(ConsumerTag, - ConsumerMapping)}, + dict:erase(ConsumerTag, + ConsumerMapping)}, case rabbit_amqqueue:with( QueueName, fun (Q) -> @@ -1123,7 +1124,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, end. basic_return(#basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, + routing_keys = [RoutingKey | _CcRoutes], content = Content}, #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), @@ -1274,22 +1275,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(Content) -> - case rabbit_basic:is_message_persistent(Content) of - {invalid, Other} -> - rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false; - IsPersistent when is_boolean(IsPersistent) -> - IsPersistent - end. - process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_not_delivered, State), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 9cc407bc34..8175ad806c 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -68,12 +68,12 @@ start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( - SupPid, - {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, Protocol, - User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, - intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), + SupPid, + {channel, {rabbit_channel, start_link, + [Channel, ClientChannelPid, ClientChannelPid, Protocol, + User, VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, + intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dbdc6cd429..15e92542a2 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -29,9 +29,9 @@ -ifdef(use_specs). -spec(start_link/1 :: (mfa()) -> - rabbit_types:ok_pid_or_error()). + rabbit_types:ok_pid_or_error()). -spec(start_link/2 :: ({'local', atom()}, mfa()) -> - rabbit_types:ok_pid_or_error()). + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 3a18950f84..8364ecd8d7 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). +-define(WAIT_FOR_VM_ATTEMPTS, 5). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -102,24 +103,22 @@ print_badrpc_diagnostics(Node) -> diagnostics(Node) -> {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - [ - {"diagnostics:", []}, - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - {"- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]}; - {ok, NamePorts} -> - {"- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]} - end, - {"- current node: ~w", [node()]}, - case init:get_argument(home) of - {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; - Other -> {"- no current node home dir: ~p", [Other]} - end, - {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]} - ]. + [{"diagnostics:", []}, + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]}; + {ok, NamePorts} -> + {"- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end, + {"- current node: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; + Other -> {"- no current node home dir: ~p", [Other]} + end, + {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}]. stop() -> ok. @@ -151,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) -> action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), + [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), + [Node, ClusterNodes]), rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); action(status, Node, [], _Opts, Inform) -> @@ -293,13 +292,34 @@ action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]})). + list_vhost_permissions, [VHost]})); + +action(wait, Node, [], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). + +wait_for_application(Node, Attempts) -> + case rpc_call(Node, application, which_applications, [infinity]) of + {badrpc, _} = E -> NewAttempts = Attempts - 1, + case NewAttempts of + 0 -> E; + _ -> wait_for_application0(Node, NewAttempts) + end; + Apps -> case proplists:is_defined(rabbit, Apps) of + %% We've seen the node up; if it goes down + %% die immediately. + true -> ok; + false -> wait_for_application0(Node, 0) + end + end. + +wait_for_application0(Node, Attempts) -> + timer:sleep(1000), + wait_for_application(Node, Attempts). default_if_empty(List, Default) when is_list(List) -> - if List == [] -> - Default; - true -> - [list_to_atom(X) || X <- List] + if List == [] -> Default; + true -> [list_to_atom(X) || X <- List] end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -390,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) -> _ -> Value end. -% the slower shutdown on windows required to flush stdout +%% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of {unix, _} -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 586563f61e..a2693c6919 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -26,8 +26,8 @@ -spec(boot/0 :: () -> 'ok'). -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> - {'ok', {rabbit_types:user(), - rabbit_framing:amqp_table()}}). + {'ok', {rabbit_types:user(), + rabbit_framing:amqp_table()}}). -spec(start_channel/7 :: (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), @@ -40,12 +40,12 @@ boot() -> {ok, _} = supervisor2:start_child( - rabbit_sup, - {rabbit_direct_client_sup, - {rabbit_client_sup, start_link, - [{local, rabbit_direct_client_sup}, - {rabbit_channel_sup, start_link, []}]}, - transient, infinity, supervisor, [rabbit_client_sup]}), + rabbit_sup, + {rabbit_direct_client_sup, + {rabbit_client_sup, start_link, + [{local, rabbit_direct_client_sup}, + {rabbit_channel_sup, start_link, []}]}, + transient, infinity, supervisor, [rabbit_client_sup]}), ok. %%---------------------------------------------------------------------------- @@ -73,7 +73,7 @@ start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( - rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, Protocol, User, VHost, - Capabilities, Collector}]), + rabbit_direct_client_sup, + [{direct, Number, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 40ade4b742..9ed532db2a 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -101,7 +101,7 @@ ensure_stats_timer(State = #state{level = none}, _Fun) -> State; ensure_stats_timer(State = #state{timer = undefined}, Fun) -> {ok, TRef} = timer:apply_after(?STATS_INTERVAL, - erlang, apply, [Fun, []]), + erlang, apply, [Fun, []]), State#state{timer = TRef}; ensure_stats_timer(State, _Fun) -> State. @@ -130,15 +130,8 @@ notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. notify(Type, Props) -> - try - %% TODO: switch to os:timestamp() when we drop support for - %% Erlang/OTP < R13B01 - gen_event:notify(rabbit_event, #event{type = Type, - props = Props, - timestamp = now()}) - catch error:badarg -> - %% badarg means rabbit_event is no longer registered. We never - %% unregister it so the great likelihood is that we're shutting - %% down the broker but some events were backed up. Ignore it. - ok - end. + %% TODO: switch to os:timestamp() when we drop support for + %% Erlang/OTP < R13B01 + gen_event:notify(rabbit_event, #event{type = Type, + props = Props, + timestamp = now()}). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 92259195c3..a463e57067 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -62,7 +62,7 @@ -> rabbit_types:infos()). -spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]). -spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys()) - -> [rabbit_types:infos()]). + -> [rabbit_types:infos()]). -spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) -> {rabbit_router:routing_result(), [pid()]}). -spec(delete/2 :: @@ -266,9 +266,9 @@ process_route(#resource{kind = queue} = QName, call_with_exchange(XName, Fun, PrePostCommitFun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of - [] -> {error, not_found}; - [X] -> Fun(X) - end + [] -> {error, not_found}; + [X] -> Fun(X) + end end, PrePostCommitFun). delete(XName, IfUnused) -> diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index c51b0913a0..349c2f6ee4 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,8 +36,8 @@ description() -> {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_routing_key(Name, RoutingKey). + #delivery{message = #basic_message{routing_keys = Routes}}) -> + rabbit_router:match_routing_key(Name, Routes). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 382fb6270d..bc5293c81d 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -36,7 +36,7 @@ description() -> {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. route(#exchange{name = Name}, _Delivery) -> - rabbit_router:match_routing_key(Name, '_'). + rabbit_router:match_routing_key(Name, ['_']). validate(_X) -> ok. create(_Tx, _X) -> ok. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100e2..f12661d48f 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -15,6 +15,7 @@ %% -module(rabbit_exchange_type_topic). + -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -31,58 +32,225 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). - --ifdef(use_specs). - --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). - --endif. +%%---------------------------------------------------------------------------- description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +%% NB: This may return duplicate results in some situations (that's ok) +route(#exchange{name = X}, + #delivery{message = #basic_message{routing_keys = Routes}}) -> + lists:append([begin + Words = split_topic_key(RKey), + mnesia:async_dirty(fun trie_match/2, [X, Words]) + end || RKey <- Routes]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + +recover(_Exchange, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end). + +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> + ok. + +add_binding(true, _Exchange, Binding) -> + internal_add_binding(Binding); +add_binding(false, _Exchange, _Binding) -> + ok. + +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> + ok. + +remove_binding(#binding{source = X, key = K, destination = D}) -> + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%%---------------------------------------------------------------------------- + +internal_add_binding(#binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok. + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {"*", fun trie_match/4, RestW}, + {"#", fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +follow_down_create(X, Words) -> + case follow_down_last_node(X, Words) of + {ok, FinalNode} -> FinalNode; + {error, Node, RestW} -> lists:foldl( + fun (W, CurNode) -> + NewNode = new_node_id(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + +follow_down_get_path(X, Words) -> + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). + +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). + +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). + +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:write/3). + +trie_remove_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + +trie_binding_op(X, Node, D, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, + write). + +trie_has_any_children(X, Node) -> + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_has_any_bindings(X, Node) -> + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). + +new_node_id() -> + rabbit_guid:guid(). + +split_topic_key(Key) -> + split_topic_key(Key, [], []). + +split_topic_key(<<>>, [], []) -> + []; +split_topic_key(<<>>, RevWordAcc, RevResAcc) -> + lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282d9..1b72dd761a 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 2f8c940b75..996b0a980f 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -111,11 +111,11 @@ stop() -> init([]) -> MemoryLimit = trunc(?MEMORY_LIMIT_SCALING * - (try - vm_memory_monitor:get_memory_limit() - catch - exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM - end)), + (try + vm_memory_monitor:get_memory_limit() + catch + exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM + end)), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abc27c5f7d..e79a58a1b6 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -105,7 +105,7 @@ ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(table_lookup/2 :: (rabbit_framing:amqp_table(), binary()) - -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). + -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -469,11 +469,11 @@ map_in_order(F, L) -> table_fold(F, Acc0, TableName) -> lists:foldl( fun (E, Acc) -> execute_mnesia_transaction( - fun () -> case mnesia:match_object(TableName, E, read) of - [] -> Acc; - _ -> F(E, Acc) - end - end) + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> Acc; + _ -> F(E, Acc) + end + end) end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> @@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) -> after 0 -> ok end. -% Separate flags and options from arguments. -% get_options([{flag, "-q"}, {option, "-p", "/"}], -% ["set_permissions","-p","/","guest", -% "-q",".*",".*",".*"]) -% == {["set_permissions","guest",".*",".*",".*"], -% [{"-q",true},{"-p","/"}]} +%% Separate flags and options from arguments. +%% get_options([{flag, "-q"}, {option, "-p", "/"}], +%% ["set_permissions","-p","/","guest", +%% "-q",".*",".*",".*"]) +%% == {["set_permissions","guest",".*",".*",".*"], +%% [{"-q",true},{"-p","/"}]} get_options(Defs, As) -> lists:foldl(fun(Def, {AsIn, RsIn}) -> {AsOut, Value} = case Def of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a9b4e17745..66436920d4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). -export([table_names/0]). @@ -54,6 +54,7 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -endif. @@ -128,10 +129,10 @@ empty_ram_only_tables() -> Node = node(), lists:foreach( fun (TabName) -> - case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of - true -> {atomic, ok} = mnesia:clear_table(TabName); - false -> ok - end + case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of + true -> {atomic, ok} = mnesia:clear_table(TabName); + false -> ok + end end, table_names()), ok. @@ -185,6 +186,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -216,6 +228,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -264,45 +282,48 @@ ensure_schema_integrity() -> check_schema_integrity() -> Tables = mnesia:system_info(tables), - case [Error || {Tab, TabDef} <- table_definitions(), - case lists:member(Tab, Tables) of - false -> - Error = {table_missing, Tab}, - true; - true -> - {_, ExpAttrs} = proplists:lookup(attributes, TabDef), - Attrs = mnesia:table_info(Tab, attributes), - Error = {table_attributes_mismatch, Tab, - ExpAttrs, Attrs}, - Attrs /= ExpAttrs - end] of - [] -> check_table_integrity(); - Errors -> {error, Errors} + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other end. -check_table_integrity() -> - ok = wait_for_tables(), - case lists:all(fun ({Tab, TabDef}) -> - {_, Match} = proplists:lookup(match, TabDef), - read_test_table(Tab, Match) - end, table_definitions()) of - true -> ok; - false -> {error, invalid_table_content} +check_table_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} end. -read_test_table(Tab, Match) -> +check_table_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), case mnesia:dirty_first(Tab) of '$end_of_table' -> - true; + ok; Key -> ObjList = mnesia:dirty_read(Tab, Key), MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), case ets:match_spec_run(ObjList, MatchComp) of - ObjList -> true; - _ -> false + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} end end. +check_tables(Fun) -> + case [Error || {Tab, TabDef} <- table_definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -369,17 +390,15 @@ init_db(ClusterNodes, Force) -> case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade - ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_ok(); + ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema - ok = wait_for_tables(), ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); + ensure_schema_integrity(); {[], false, _} -> %% Nothing there at all, start from scratch ok = create_schema(); @@ -396,7 +415,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_ok() + ensure_schema_integrity() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -429,12 +448,6 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_upgrade:write_version(). -ensure_schema_ok() -> - case check_schema_integrity() of - ok -> ok; - {error, Reason} -> throw({error, {schema_invalid, Reason}}) - end. - create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -443,7 +456,6 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(), ok = rabbit_upgrade:write_version(). move_db() -> @@ -472,8 +484,7 @@ copy_db(Destination) -> mnesia:stop(), case rabbit_misc:recursive_copy(dir(), Destination) of ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); {error, E} -> {error, E} end. @@ -508,13 +519,13 @@ create_local_table_copies(Type) -> HasDiscOnlyCopies -> disc_only_copies; true -> ram_copies end; -%% unused code - commented out to keep dialyzer happy -%% Type =:= disc_only -> -%% if -%% HasDiscCopies or HasDiscOnlyCopies -> -%% disc_only_copies; -%% true -> ram_copies -%% end; +%%% unused code - commented out to keep dialyzer happy +%%% Type =:= disc_only -> +%%% if +%%% HasDiscCopies or HasDiscOnlyCopies -> +%%% disc_only_copies; +%%% true -> ram_copies +%%% end; Type =:= ram -> ram_copies end, @@ -541,7 +552,8 @@ wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; + ok -> + ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index cfea4982b4..ea7cf80cc5 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -16,7 +16,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/2]). +-export([append/3, read/2, scan/4]). %%---------------------------------------------------------------------------- @@ -45,9 +45,9 @@ -spec(read/2 :: (io_device(), msg_size()) -> rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()}, any())). --spec(scan/2 :: (io_device(), file_size()) -> - {'ok', [{rabbit_guid:guid(), msg_size(), position()}], - position()}). +-spec(scan/4 :: (io_device(), file_size(), + fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A), + A) -> {'ok', A, position()}). -endif. @@ -60,9 +60,9 @@ append(FileHdl, Guid, MsgBody) Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <<Size:?INTEGER_SIZE_BITS, - Guid:?GUID_SIZE_BYTES/binary, - MsgBodyBin:MsgBodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of + Guid:?GUID_SIZE_BYTES/binary, + MsgBodyBin:MsgBodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; KO -> KO end. @@ -72,36 +72,36 @@ read(FileHdl, TotalSize) -> BodyBinSize = Size - ?GUID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - Guid:?GUID_SIZE_BYTES/binary, - MsgBodyBin:BodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> + Guid:?GUID_SIZE_BYTES/binary, + MsgBodyBin:BodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> {ok, {Guid, binary_to_term(MsgBodyBin)}}; KO -> KO end. -scan(FileHdl, FileSize) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0). +scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc). -scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) -> +scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) -> {ok, Acc, ScanOffset}; -scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) -> +scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) -> Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]), case file_handle_cache:read(FileHdl, Read) of {ok, Data1} -> {Data2, Acc1, ScanOffset1} = - scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset), + scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc), ReadOffset1 = ReadOffset + size(Data1), - scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1); + scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1); _KO -> {ok, Acc, ScanOffset} end. -scan(<<>>, Acc, Offset) -> +scanner(<<>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; -scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) -> +scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) -> {<<>>, Acc, Offset}; %% Nothing to do other than stop. -scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, - WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) -> +scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, + WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) -> TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, case WriteMarker of ?WRITE_OK_MARKER -> @@ -110,12 +110,13 @@ scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, %% which we read the Guid as a number, and then convert it %% back to a binary in order to work around bugs in %% Erlang's GC. - <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> = + <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = <<GuidAndMsg:Size/binary>>, <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize); + scanner(Rest, Offset + TotalSize, Fun, + Fun({Guid, TotalSize, Offset, Msg}, Acc)); _ -> - scan(Rest, Acc, Offset + TotalSize) + scanner(Rest, Offset + TotalSize, Fun, Acc) end; -scan(Data, Acc, Offset) -> +scanner(Data, Offset, _Fun, Acc) -> {Data, Acc, Offset}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7f3cf35faa..8e1b2ac482 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -26,6 +26,8 @@ -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal +-export([transform_dir/3, force_recovery/2]). %% upgrade + -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]). @@ -33,9 +35,10 @@ -include("rabbit_msg_store.hrl"). --define(SYNC_INTERVAL, 25). %% milliseconds +-define(SYNC_INTERVAL, 5). %% milliseconds -define(CLEAN_FILENAME, "clean.dot"). -define(FILE_SUMMARY_FILENAME, "file_summary.ets"). +-define(TRANSFORM_TMP, "transform_tmp"). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -72,7 +75,7 @@ successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? cref_to_guids %% client ref to synced messages mapping - }). + }). -record(client_msstate, { server, @@ -86,7 +89,7 @@ file_summary_ets, dedup_cache_ets, cur_file_cache_ets - }). + }). -record(file_summary, {file, valid_total_size, left, right, file_size, locked, readers}). @@ -160,6 +163,9 @@ -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). +-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). +-spec(transform_dir/3 :: (file:filename(), server(), + fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok'). -endif. @@ -543,7 +549,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% GC ends, we +1 readers, msg_store ets:deletes (and %% unlocks the dest) try Release(), - Defer() + Defer() catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> @@ -661,7 +667,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, cref_to_guids = dict:new() - }, + }, %% If we didn't recover the msg location index then we need to %% rebuild it now. @@ -1250,7 +1256,7 @@ safe_file_delete(File, Dir, FileHandlesEts) -> close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, client_ref = Ref } = - CState) -> + CState) -> Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}), {ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) -> true = ets:delete(FileHandlesEts, Key), @@ -1459,7 +1465,7 @@ recover_file_summary(true, Dir) -> Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME), case ets:file2tab(Path) of {ok, Tid} -> file:delete(Path), - {true, Tid}; + {true, Tid}; {error, _Error} -> recover_file_summary(false, Dir) end. @@ -1523,7 +1529,8 @@ scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( Hdl, filelib:file_size( - form_filename(Dir, FileName))), + form_filename(Dir, FileName)), + fun scan_fun/2, []), %% if something really bad has happened, %% the close could fail, but ignore file_handle_cache:close(Hdl), @@ -1532,6 +1539,9 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. +scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) -> + [{Guid, TotalSize, Offset} | Acc]. + %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1683,8 +1693,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, pending_gc_completion = Pending, file_summary_ets = FileSummaryEts, file_size_limit = FileSizeLimit }) - when (SumFileSize > 2 * FileSizeLimit andalso - (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) -> + when SumFileSize > 2 * FileSizeLimit andalso + (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> %% TODO: the algorithm here is sub-optimal - it may result in a %% complete traversal of FileSummaryEts. case ets:first(FileSummaryEts) of @@ -1747,10 +1757,10 @@ delete_file_if_empty(File, State = #msstate { locked = false }] = ets:lookup(FileSummaryEts, File), case ValidData of - 0 -> %% don't delete the file_summary_ets entry for File here - %% because we could have readers which need to be able to - %% decrement the readers count. - true = ets:update_element(FileSummaryEts, File, + %% don't delete the file_summary_ets entry for File here + %% because we could have readers which need to be able to + %% decrement the readers count. + 0 -> true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), Pending1 = orddict_store(File, [], Pending), @@ -1803,17 +1813,17 @@ combine_files(Source, Destination, dir = Dir, msg_store = Server }) -> [#file_summary { - readers = 0, - left = Destination, - valid_total_size = SourceValid, - file_size = SourceFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Source), + readers = 0, + left = Destination, + valid_total_size = SourceValid, + file_size = SourceFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Source), [#file_summary { - readers = 0, - right = Source, - valid_total_size = DestinationValid, - file_size = DestinationFileSize, - locked = true }] = ets:lookup(FileSummaryEts, Destination), + readers = 0, + right = Source, + valid_total_size = DestinationValid, + file_size = DestinationFileSize, + locked = true }] = ets:lookup(FileSummaryEts, Destination), SourceName = filenum_to_name(Source), DestinationName = filenum_to_name(Destination), @@ -1956,3 +1966,47 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {got, FinalOffsetZ}, {destination, Destination}]} end. + +force_recovery(BaseDir, Store) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), + file:delete(filename:join(Dir, ?CLEAN_FILENAME)), + recover_crashed_compactions(BaseDir), + ok. + +foreach_file(D, Fun, Files) -> + [Fun(filename:join(D, File)) || File <- Files]. + +foreach_file(D1, D2, Fun, Files) -> + [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files]. + +transform_dir(BaseDir, Store, TransformFun) -> + Dir = filename:join(BaseDir, atom_to_list(Store)), + TmpDir = filename:join(Dir, ?TRANSFORM_TMP), + TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end, + case filelib:is_dir(TmpDir) of + true -> throw({error, transform_failed_previously}); + false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), + foreach_file(Dir, TmpDir, TransformFile, FileList), + foreach_file(Dir, fun file:delete/1, FileList), + foreach_file(TmpDir, Dir, fun file:copy/2, FileList), + foreach_file(TmpDir, fun file:delete/1, FileList), + ok = file:del_dir(TmpDir) + end. + +transform_msg_file(FileOld, FileNew, TransformFun) -> + rabbit_misc:ensure_parent_dirs_exist(FileNew), + {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []), + {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], + [{write_buffer, + ?HANDLE_CACHE_BUFFER_SIZE}]), + {ok, _Acc, _IgnoreSize} = + rabbit_msg_file:scan( + RefOld, filelib:file_size(FileOld), + fun({Guid, _Size, _Offset, BinMsg}, ok) -> + {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), + {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), + ok + end, ok), + file_handle_cache:close(RefOld), + file_handle_cache:close(RefNew), + ok. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl deleted file mode 100644 index ebd7fe8a06..0000000000 --- a/src/rabbit_multi.erl +++ /dev/null @@ -1,349 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(rabbit_multi). --include("rabbit.hrl"). - --export([start/0, stop/0]). - --define(RPC_SLEEP, 500). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start/0 :: () -> no_return()). --spec(stop/0 :: () -> 'ok'). --spec(usage/0 :: () -> no_return()). - --endif. - -%%---------------------------------------------------------------------------- - -start() -> - RpcTimeout = - case init:get_argument(maxwait) of - {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> ?MAX_WAIT - end, - case init:get_plain_arguments() of - [] -> - usage(); - FullCommand -> - {Command, Args} = parse_args(FullCommand), - case catch action(Command, Args, RpcTimeout) of - ok -> - io:format("done.~n"), - halt(); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join(FullCommand, " ")]), - usage(); - timeout -> - print_error("timeout starting some nodes.", []), - halt(1); - Other -> - print_error("~p", [Other]), - halt(2) - end - end. - -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). - -parse_args([Command | Args]) -> - {list_to_atom(Command), Args}. - -stop() -> - ok. - -usage() -> - io:format("~s", [rabbit_multi_usage:usage()]), - halt(1). - -action(start_all, [NodeCount], RpcTimeout) -> - io:format("Starting all nodes...~n", []), - application:load(rabbit), - {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( - getenv("RABBITMQ_NODENAME")), - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); - {ok, _} -> - ok - end, - {NodePids, Running} = - case list_to_integer(NodeCount) of - 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), - RpcTimeout), - {[NodePid], Started}; - N -> start_nodes(N, N, [], true, NodeName, - get_node_tcp_listener(), RpcTimeout) - end, - write_pids_file(NodePids), - case Running of - true -> ok; - false -> timeout - end; - -action(status, [], RpcTimeout) -> - io:format("Status of all running nodes...~n", []), - call_all_nodes( - fun ({Node, Pid}) -> - RabbitRunning = - case is_rabbit_running(Node, RpcTimeout) of - false -> not_running; - true -> running - end, - io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, RabbitRunning]) - end); - -action(stop_all, [], RpcTimeout) -> - io:format("Stopping all nodes...~n", []), - call_all_nodes(fun ({Node, Pid}) -> - io:format("Stopping node ~p~n", [Node]), - rpc:call(Node, rabbit, stop_and_halt, []), - case kill_wait(Pid, RpcTimeout, false) of - false -> kill_wait(Pid, RpcTimeout, true); - true -> ok - end, - io:format("OK~n", []) - end), - delete_pids_file(); - -action(rotate_logs, [], RpcTimeout) -> - action(rotate_logs, [""], RpcTimeout); - -action(rotate_logs, [Suffix], RpcTimeout) -> - io:format("Rotating logs for all nodes...~n", []), - BinarySuffix = list_to_binary(Suffix), - call_all_nodes( - fun ({Node, _}) -> - io:format("Rotating logs for node ~p", [Node]), - case rpc:call(Node, rabbit, rotate_logs, - [BinarySuffix], RpcTimeout) of - {badrpc, Error} -> io:format(": ~p.~n", [Error]); - ok -> io:format(": ok.~n", []) - end - end). - -%% PNodePid is the list of PIDs -%% Running is a boolean exhibiting success at some moment -start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; - -start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> - {NodePre, NodeSuff} = NodeNameBase, - NodeNumber = Total - N, - NodePre1 = case NodeNumber of - %% For compatibility with running a single node - 0 -> NodePre; - _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) - end, - Node = rabbit_misc:makenode({NodePre1, NodeSuff}), - os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), - case Listener of - {NodeIpAddress, NodePortBase} -> - NodePort = NodePortBase + NodeNumber, - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), - os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); - undefined -> - ok - end, - {NodePid, Started} = start_node(Node, RpcTimeout), - start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, NodeNameBase, Listener, RpcTimeout). - -start_node(Node, RpcTimeout) -> - io:format("Starting node ~s...~n", [Node]), - case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> - Port = run_rabbitmq_server(), - Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), - Pid = case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> throw(cannot_get_pid); - PidS -> list_to_integer(PidS) - end, - io:format("~s~n", [case Started of - true -> "OK"; - false -> "timeout" - end]), - {{Node, Pid}, Started}; - PidS -> - Pid = list_to_integer(PidS), - throw({node_already_running, Node, Pid}) - end. - -wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> - false; -wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case is_rabbit_running(Node, RpcTimeout) of - true -> true; - false -> receive - {'EXIT', Port, PosixCode} -> - throw({node_start_failed, PosixCode}) - after ?RPC_SLEEP -> - wait_for_rabbit_to_start( - Node, RpcTimeout - ?RPC_SLEEP, Port) - end - end. - -run_rabbitmq_server() -> - with_os([{unix, fun run_rabbitmq_server_unix/0}, - {win32, fun run_rabbitmq_server_win32/0}]). - -run_rabbitmq_server_unix() -> - CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput", - erlang:open_port({spawn, CmdLine}, [nouse_stdio]). - -run_rabbitmq_server_win32() -> - Cmd = filename:nativename(os:find_executable("cmd")), - CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++ - "\\rabbitmq-server.bat\" -noinput -detached", - erlang:open_port({spawn_executable, Cmd}, - [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, - nouse_stdio]). - -is_rabbit_running(Node, RpcTimeout) -> - case rpc:call(Node, rabbit, status, [], RpcTimeout) of - {badrpc, _} -> false; - Status -> case proplists:get_value(running_applications, Status) of - undefined -> false; - Apps -> lists:keymember(rabbit, 1, Apps) - end - end. - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -pids_file() -> getenv("RABBITMQ_PIDS_FILE"). - -write_pids_file(Pids) -> - FileName = pids_file(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> - Device; - {error, Reason} -> - throw({cannot_create_pids_file, FileName, Reason}) - end, - try - ok = io:write(Handle, Pids), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({cannot_create_pids_file, FileName, Reason1}) - end - end, - ok. - -delete_pids_file() -> - FileName = pids_file(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason}) - end. - -read_pids_file() -> - FileName = pids_file(), - case file:consult(FileName) of - {ok, [Pids]} -> Pids; - {error, enoent} -> []; - {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason}) - end. - -kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 -> - Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9"; - true -> "kill" - end - end}, - %% Kill forcefully always on Windows, since erl.exe - %% seems to completely ignore non-forceful killing - %% even when everything is working - {win32, fun () -> "taskkill /f /pid" end}]), - os:cmd(Cmd ++ " " ++ integer_to_list(Pid)), - false; % Don't assume what we did just worked! - -% Returns true if the process is dead, false otherwise. -kill_wait(Pid, TimeLeft, Forceful) -> - timer:sleep(?RPC_SLEEP), - io:format(".", []), - is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). - -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -is_dead(Pid) -> - PidS = integer_to_list(Pid), - with_os([{unix, fun () -> - system("kill -0 " ++ PidS - ++ " >/dev/null 2>&1") /= 0 - end}, - {win32, fun () -> - Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - PidS ++ "\" 2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> false; - _ -> true - end - end}]). - -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. - -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). - -call_all_nodes(Func) -> - case read_pids_file() of - [] -> throw(no_nodes_running); - NodePids -> lists:foreach(Func, NodePids) - end. - -getenv(Var) -> - case os:getenv(Var) of - false -> throw({missing_env_var, Var}); - Value -> Value - end. - -get_node_tcp_listener() -> - try - {getenv("RABBITMQ_NODE_IP_ADDRESS"), - list_to_integer(getenv("RABBITMQ_NODE_PORT"))} - catch _ -> - case application:get_env(rabbit, tcp_listeners) of - {ok, [{_IpAddy, _Port} = Listener]} -> - Listener; - {ok, [Port]} when is_number(Port) -> - {"0.0.0.0", Port}; - {ok, []} -> - undefined; - {ok, Other} -> - throw({cannot_start_multiple_nodes, multiple_tcp_listeners, - Other}); - undefined -> - throw({missing_configuration, tcp_listeners}) - end - end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 36f61628b8..877d2cf7da 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -67,7 +67,7 @@ -spec(close_connection/2 :: (pid(), string()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). + -> [{inet:ip_address(), ip_port(), family(), atom()}]). -endif. @@ -90,15 +90,15 @@ boot_ssl() -> {ok, SslListeners} -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(ssl_options), - % unknown_ca errors are silently ignored prior to R14B unless we - % supply this verify_fun - remove when at least R14B is required + %% unknown_ca errors are silently ignored prior to R14B unless we + %% supply this verify_fun - remove when at least R14B is required SslOpts = case proplists:get_value(verify, SslOptsConfig, verify_none) of verify_none -> SslOptsConfig; verify_peer -> [{verify_fun, fun([]) -> true; ([_|_]) -> false end} - | SslOptsConfig] + | SslOptsConfig] end, [start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners], ok diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 817abaa2bd..1f30a2fc35 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -69,6 +69,7 @@ handle_call(_Request, _From, State) -> handle_cast({rabbit_running_on, Node}, State) -> rabbit_log:info("node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), + ok = rabbit_alarm:on_node_up(Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -76,7 +77,7 @@ handle_cast(_Msg, State) -> handle_info({nodedown, Node}, State) -> rabbit_log:info("node ~p down~n", [Node]), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, State}; handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), ok = handle_dead_rabbit(Node), @@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -%% TODO: This may turn out to be a performance hog when there are -%% lots of nodes. We really only need to execute this code on -%% *one* node, rather than all of them. +%% TODO: This may turn out to be a performance hog when there are lots +%% of nodes. We really only need to execute some of these statements +%% on *one* node, rather than all of them. handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node). - + ok = rabbit_amqqueue:on_node_down(Node), + ok = rabbit_alarm:on_node_down(Node). diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index d9d92788e1..7bb8c0ea3b 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -250,13 +250,13 @@ duplicate_node_check(NodeStr) -> case net_adm:names(NodeHost) of {ok, NamePorts} -> case proplists:is_defined(NodeName, NamePorts) of - true -> io:format("node with name ~p " - "already running on ~p~n", - [NodeName, NodeHost]), - [io:format(Fmt ++ "~n", Args) || - {Fmt, Args} <- rabbit_control:diagnostics(Node)], - terminate(?ERROR_CODE); - false -> ok + true -> io:format("node with name ~p " + "already running on ~p~n", + [NodeName, NodeHost]), + [io:format(Fmt ++ "~n", Args) || + {Fmt, Args} <- rabbit_control:diagnostics(Node)], + terminate(?ERROR_CODE); + false -> ok end; {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n", [EpmdReason]) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 76b1136f8b..00f5a75219 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -145,8 +145,8 @@ %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * - (?PUBLISH_RECORD_LENGTH_BYTES + - (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). + (?PUBLISH_RECORD_LENGTH_BYTES + + (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). %% ---- misc ---- @@ -177,7 +177,7 @@ path :: file:filename(), journal_entries :: array(), unacked :: non_neg_integer() - })). + })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). -type(on_sync_fun() :: fun ((gb_set()) -> ok)). @@ -188,10 +188,10 @@ max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), unsynced_guids :: [rabbit_guid:guid()] - }). + }). -type(startup_fun_state() :: {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), - A}). + A}). -type(shutdown_terms() :: [any()]). -spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). @@ -272,7 +272,7 @@ publish(Guid, SeqId, MsgProps, IsPersistent, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]), + create_pub_record_body(Guid, MsgProps)]), maybe_flush_journal( add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)). @@ -666,8 +666,8 @@ recover_journal(State) -> journal_minus_segment(JEntries, SegEntries), Segment #segment { journal_entries = JEntries1, unacked = (UnackedCountInJournal + - UnackedCountInSeg - - UnackedCountDuplicates) } + UnackedCountInSeg - + UnackedCountDuplicates) } end, Segments), State1 #qistate { segments = Segments1 }. @@ -799,16 +799,16 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> {Guid, MsgProps, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, - create_pub_record_body(Guid, MsgProps)]) + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, MsgProps)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> ok; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, + RelSeq:?REL_SEQ_BITS>>, file_handle_cache:append( Hdl, case {Del, Ack} of {del, ack} -> [Binary, Binary]; @@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) -> load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> {Guid, MsgProps} = read_pub_record_body(Hdl), Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> + RelSeq:?REL_SEQ_BITS>>} -> {UnackedCountDelta, SegEntries1} = case array:get(RelSeq, SegEntries) of {Pub, no_del, no_ack} -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3908b64692..710e687809 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -define(SILENT_CLOSE_DELAY, 3). -define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation -%--------------------------------------------------------------------------- +%%-------------------------------------------------------------------------- -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, @@ -62,7 +62,7 @@ State#v1.connection_state =:= blocking orelse State#v1.connection_state =:= blocked)). -%%---------------------------------------------------------------------------- +%%-------------------------------------------------------------------------- -ifdef(use_specs). @@ -158,7 +158,7 @@ server_properties(Protocol) -> {copyright, ?COPYRIGHT_MESSAGE}, {information, ?INFORMATION_MESSAGE}]]], - %% Filter duplicated properties in favor of config file provided values + %% Filter duplicated properties in favour of config file provided values lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end, NormalizedConfigServerProps). @@ -564,7 +564,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, version_major = ProtocolMajor, version_minor = ProtocolMinor, server_properties = server_properties(Protocol), - mechanisms = auth_mechanisms_binary(), + mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ @@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> HandleException = fun(R) -> - case ?IS_RUNNING(State) of - true -> send_exception(State, 0, R); - %% We don't trust the client at this point - force - %% them to wait for a bit so they can't DOS us with - %% repeated failed logins etc. - false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, R}) - end + case ?IS_RUNNING(State) of + true -> send_exception(State, 0, R); + %% We don't trust the client at this point - force + %% them to wait for a bit so they can't DOS us with + %% repeated failed logins etc. + false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), + throw({channel0_error, State#v1.connection_state, R}) + end end, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), @@ -616,7 +616,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, State0 = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> - AuthMechanism = auth_mechanism_to_module(Mechanism), + AuthMechanism = auth_mechanism_to_module(Mechanism, Sock), Capabilities = case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of {table, Capabilities1} -> Capabilities1; @@ -709,14 +709,14 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). -auth_mechanism_to_module(TypeBin) -> +auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown authentication mechanism '~s'", [TypeBin]); T -> - case {lists:member(T, auth_mechanisms()), + case {lists:member(T, auth_mechanisms(Sock)), rabbit_registry:lookup_module(auth_mechanism, T)} of {true, {ok, Module}} -> Module; @@ -727,15 +727,14 @@ auth_mechanism_to_module(TypeBin) -> end end. -auth_mechanisms() -> +auth_mechanisms(Sock) -> {ok, Configured} = application:get_env(auth_mechanisms), - [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), - lists:member(Name, Configured)]. + [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism), + Module:should_offer(Sock), lists:member(Name, Configured)]. -auth_mechanisms_binary() -> +auth_mechanisms_binary(Sock) -> list_to_binary( - string:join( - [atom_to_list(A) || A <- auth_mechanisms()], " ")). + string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")). auth_phase(Response, State = #v1{auth_mechanism = AuthMechanism, diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 795413aa5c..9821ae7b86 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -48,7 +48,7 @@ start_link() -> %%--------------------------------------------------------------------------- register(Class, TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 692d2473b8..f6a1c92fcc 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -37,7 +37,8 @@ fun ((rabbit_types:binding()) -> boolean())) -> match_result()). -spec(match_routing_key/2 :: (rabbit_types:binding_source(), - routing_key() | '_') -> match_result()). + [routing_key()] | ['_']) -> + match_result()). -endif. @@ -58,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, {routed, QPids}; deliver(QNames, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> + immediate = Immediate}) -> QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, @@ -66,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory, rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), + lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Mandatory, Immediate, {Routed, Handled}). @@ -82,12 +83,22 @@ match_bindings(SrcName, Match) -> Match(Binding)]), mnesia:async_dirty(fun qlc:e/1, [Query]). -match_routing_key(SrcName, RoutingKey) -> +match_routing_key(SrcName, [RoutingKey]) -> MatchHead = #route{binding = #binding{source = SrcName, destination = '$1', key = RoutingKey, _ = '_'}}, - mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]). + mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]); +match_routing_key(SrcName, [_|_] = RoutingKeys) -> + Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} || + RKey <- RoutingKeys]]), + MatchHead = #route{binding = #binding{source = SrcName, + destination = '$1', + key = '$2', + _ = '_'}}, + mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]). + + %%-------------------------------------------------------------------- diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index e831ee5141..1953b6b85c 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -87,8 +87,8 @@ cert_info(F, Cert) -> find_by_type(Type, {rdnSequence, RDNs}) -> case [V || #'AttributeTypeAndValue'{type = T, value = V} - <- lists:flatten(RDNs), - T == Type] of + <- lists:flatten(RDNs), + T == Type] of [{printableString, S}] -> S; [] -> not_found end. @@ -166,7 +166,7 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString; true -> S end; format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2, - Min1, Min2, S1, S2, $Z]}) -> + Min1, Min2, S1, S2, $Z]}) -> io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ", [Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]); format_asn1_value(V) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 4e526f5d4f..d7587fe04b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,6 +35,7 @@ test_content_prop_roundtrip(Datum, Binary) -> Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion all_tests() -> + passed = gm_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), passed = test_file_handle_cache(), @@ -424,35 +425,35 @@ test_content_properties() -> [{<<"one">>, signedint, 1}, {<<"two">>, signedint, 2}]}]}], << - % property-flags - 16#8000:16, + %% property-flags + 16#8000:16, - % property-list: + %% property-list: - % table - 117:32, % table length in bytes + %% table + 117:32, % table length in bytes - 11,"a signedint", % name - "I",12345678:32, % type and value + 11,"a signedint", % name + "I",12345678:32, % type and value - 9,"a longstr", - "S",10:32,"yes please", + 9,"a longstr", + "S",10:32,"yes please", - 9,"a decimal", - "D",123,12345678:32, + 9,"a decimal", + "D",123,12345678:32, - 11,"a timestamp", - "T", 123456789012345:64, + 11,"a timestamp", + "T", 123456789012345:64, - 14,"a nested table", - "F", - 18:32, + 14,"a nested table", + "F", + 18:32, - 3,"one", - "I",1:32, + 3,"one", + "I",1:32, - 3,"two", - "I",2:32 >>), + 3,"two", + "I",2:32 >>), case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of {'EXIT', content_properties_binary_overflow} -> passed; V -> exit({got_success_but_expected_failure, V}) @@ -479,28 +480,28 @@ test_field_values() -> ]}], << - % property-flags - 16#8000:16, - % table length in bytes - 228:32, - - 7,"longstr", "S", 21:32, "Here is a long string", % = 34 - 9,"signedint", "I", 12345:32/signed, % + 15 = 49 - 7,"decimal", "D", 3, 123456:32, % + 14 = 63 - 9,"timestamp", "T", 109876543209876:64, % + 19 = 82 - 5,"table", "F", 31:32, % length of table % + 11 = 93 - 3,"one", "I", 54321:32, % + 9 = 102 - 3,"two", "S", 13:32, "A long string",% + 22 = 124 - 4,"byte", "b", 255:8, % + 7 = 131 - 4,"long", "l", 1234567890:64, % + 14 = 145 - 5,"short", "s", 655:16, % + 9 = 154 - 4,"bool", "t", 1, % + 7 = 161 - 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 - 4,"void", "V", % + 6 = 194 - 5,"array", "A", 23:32, % + 11 = 205 - "I", 54321:32, % + 5 = 210 - "S", 13:32, "A long string" % + 18 = 228 - >>), + %% property-flags + 16#8000:16, + %% table length in bytes + 228:32, + + 7,"longstr", "S", 21:32, "Here is a long string", % = 34 + 9,"signedint", "I", 12345:32/signed, % + 15 = 49 + 7,"decimal", "D", 3, 123456:32, % + 14 = 63 + 9,"timestamp", "T", 109876543209876:64, % + 19 = 82 + 5,"table", "F", 31:32, % length of table % + 11 = 93 + 3,"one", "I", 54321:32, % + 9 = 102 + 3,"two", "S", 13:32, "A long string", % + 22 = 124 + 4,"byte", "b", 255:8, % + 7 = 131 + 4,"long", "l", 1234567890:64, % + 14 = 145 + 5,"short", "s", 655:16, % + 9 = 154 + 4,"bool", "t", 1, % + 7 = 161 + 6,"binary", "x", 15:32, "a binary string", % + 27 = 188 + 4,"void", "V", % + 6 = 194 + 5,"array", "A", 23:32, % + 11 = 205 + "I", 54321:32, % + 5 = 210 + "S", 13:32, "A long string" % + 18 = 228 + >>), passed. %% Test that content frames don't exceed frame-max @@ -585,32 +586,131 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. - test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + exchange_op_callback(X, create, []), + + %% add some bindings + Bindings = lists:map( + fun ({Key, Q}) -> + #binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]), + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + Bindings), + + %% test some matches + test_topic_expect_match( + X, [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", + "t24", "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", + "t23", "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", + "t22", "t23", "t24", "t26"]}, + {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings), lists:nth(19, Bindings), + lists:nth(21, Bindings)], + exchange_op_callback(X, remove_bindings, [RemovedBindings]), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + + %% remove the entire exchange + exchange_op_callback(X, delete, [RemainingBindings]), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + +test_topic_expect_match(X, List) -> + lists:foreach( + fun ({Key, Expected}) -> + BinKey = list_to_binary(Key), + Res = rabbit_exchange_type_topic:route( + X, #delivery{message = #basic_message{routing_keys = + [BinKey]}}), + ExpectedRes = lists:map( + fun (Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). + test_app_management() -> %% starting, stopping, status ok = control_action(stop_app, []), @@ -718,7 +818,7 @@ test_log_management_during_startup() -> ok = delete_log_handlers([sasl_report_tty_h]), ok = case catch control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, - log_rotation_tty_no_handlers_test}); + log_rotation_tty_no_handlers_test}); {error, {cannot_log_to_tty, _, _}} -> ok end, @@ -743,8 +843,8 @@ test_log_management_during_startup() -> ok = add_log_handlers([{error_logger_file_h, MainLog}]), ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, - log_rotation_no_write_permission_dir_test}); - {error, {cannot_log_to_file, _, _}} -> ok + log_rotation_no_write_permission_dir_test}); + {error, {cannot_log_to_file, _, _}} -> ok end, %% start application with logging to a subdirectory which @@ -754,9 +854,9 @@ test_log_management_during_startup() -> ok = add_log_handlers([{error_logger_file_h, MainLog}]), ok = case control_action(start_app, []) of ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}); + log_rotatation_parent_dirs_test}); {error, {cannot_log_to_file, _, - {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok + {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok end, ok = set_permissions(TmpDir, 8#00700), ok = set_permissions(TmpLog, 8#00600), @@ -776,22 +876,22 @@ test_log_management_during_startup() -> passed. test_option_parser() -> - % command and arguments should just pass through + %% command and arguments should just pass through ok = check_get_options({["mock_command", "arg1", "arg2"], []}, [], ["mock_command", "arg1", "arg2"]), - % get flags + %% get flags ok = check_get_options( {["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]}, [{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]), - % get options + %% get options ok = check_get_options( {["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]}, [{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}], ["mock_command", "-foo", "bar"]), - % shuffled and interleaved arguments and options + %% shuffled and interleaved arguments and options ok = check_get_options( {["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]}, [{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}], @@ -1043,7 +1143,7 @@ test_server_status() -> [_|_] = rabbit_binding:list_for_source( rabbit_misc:r(<<"/">>, exchange, <<"">>)), [_] = rabbit_binding:list_for_destination( - rabbit_misc:r(<<"/">>, queue, <<"foo">>)), + rabbit_misc:r(<<"/">>, queue, <<"foo">>)), [_] = rabbit_binding:list_for_source_and_destination( rabbit_misc:r(<<"/">>, exchange, <<"">>), rabbit_misc:r(<<"/">>, queue, <<"foo">>)), @@ -1205,9 +1305,9 @@ test_delegates_async(SecondaryNode) -> make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> fun () -> - receive Msg -> FMsg(Msg) - after 1000 -> throw(Throw) - end + receive Msg -> FMsg(Msg) + after 1000 -> throw(Throw) + end end. spawn_responders(Node, Responder, Count) -> @@ -1218,10 +1318,10 @@ await_response(0) -> await_response(Count) -> receive response -> ok, - await_response(Count - 1) + await_response(Count - 1) after 1000 -> - io:format("Async reply not received~n"), - throw(timeout) + io:format("Async reply not received~n"), + throw(timeout) end. must_exit(Fun) -> @@ -1233,11 +1333,11 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end, BadSender = fun (_Pid) -> exit(exception) end, Responder = make_responder(fun ({'$gen_call', From, invoked}) -> - gen_server:reply(From, response) + gen_server:reply(From, response) end), BadResponder = make_responder(fun ({'$gen_call', From, invoked}) -> @@ -1249,7 +1349,7 @@ test_delegates_sync(SecondaryNode) -> must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end), must_exit(fun () -> - delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), + delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2), @@ -1338,7 +1438,7 @@ test_declare_on_dead_queue(SecondaryNode) -> throw(failed_to_create_and_kill_queue) end. -%--------------------------------------------------------------------- +%%--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args, default_options()). @@ -1853,7 +1953,7 @@ test_queue_index() -> with_empty_test_queue( fun (Qi0) -> {Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, - false, Qi0), + false, Qi0), Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1), Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2), rabbit_queue_index:flush(Qi3) @@ -2095,7 +2195,7 @@ check_variable_queue_status(VQ0, Props) -> variable_queue_wait_for_shuffling_end(VQ) -> case rabbit_variable_queue:needs_idle_timeout(VQ) of true -> variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:idle_timeout(VQ)); + rabbit_variable_queue:idle_timeout(VQ)); false -> VQ end. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 3dbe740f27..a11595e55d 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -42,39 +42,39 @@ %% TODO: make this more precise by tying specific class_ids to %% specific properties -type(undecoded_content() :: - #content{class_id :: rabbit_framing:amqp_class_id(), - properties :: 'none', - properties_bin :: binary(), - payload_fragments_rev :: [binary()]} | - #content{class_id :: rabbit_framing:amqp_class_id(), - properties :: rabbit_framing:amqp_property_record(), - properties_bin :: 'none', - payload_fragments_rev :: [binary()]}). + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: 'none', + properties_bin :: binary(), + payload_fragments_rev :: [binary()]} | + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: rabbit_framing:amqp_property_record(), + properties_bin :: 'none', + payload_fragments_rev :: [binary()]}). -type(unencoded_content() :: undecoded_content()). -type(decoded_content() :: - #content{class_id :: rabbit_framing:amqp_class_id(), - properties :: rabbit_framing:amqp_property_record(), - properties_bin :: maybe(binary()), - payload_fragments_rev :: [binary()]}). + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: rabbit_framing:amqp_property_record(), + properties_bin :: maybe(binary()), + payload_fragments_rev :: [binary()]}). -type(encoded_content() :: - #content{class_id :: rabbit_framing:amqp_class_id(), - properties :: maybe(rabbit_framing:amqp_property_record()), - properties_bin :: binary(), - payload_fragments_rev :: [binary()]}). + #content{class_id :: rabbit_framing:amqp_class_id(), + properties :: maybe(rabbit_framing:amqp_property_record()), + properties_bin :: binary(), + payload_fragments_rev :: [binary()]}). -type(content() :: undecoded_content() | decoded_content()). -type(basic_message() :: - #basic_message{exchange_name :: rabbit_exchange:name(), - routing_key :: rabbit_router:routing_key(), - content :: content(), - guid :: rabbit_guid:guid(), - is_persistent :: boolean()}). + #basic_message{exchange_name :: rabbit_exchange:name(), + routing_keys :: [rabbit_router:routing_key()], + content :: content(), + guid :: rabbit_guid:guid(), + is_persistent :: boolean()}). -type(message() :: basic_message()). -type(delivery() :: - #delivery{mandatory :: boolean(), - immediate :: boolean(), - txn :: maybe(txn()), - sender :: pid(), - message :: message()}). + #delivery{mandatory :: boolean(), + immediate :: boolean(), + txn :: maybe(txn()), + sender :: pid(), + message :: message()}). -type(message_properties() :: #message_properties{expiry :: pos_integer() | 'undefined', needs_confirming :: boolean()}). @@ -89,9 +89,9 @@ -type(infos() :: [info()]). -type(amqp_error() :: - #amqp_error{name :: rabbit_framing:amqp_exception(), - explanation :: string(), - method :: rabbit_framing:amqp_method_name()}). + #amqp_error{name :: rabbit_framing:amqp_exception(), + explanation :: string(), + method :: rabbit_framing:amqp_method_name()}). -type(r(Kind) :: r2(vhost(), Kind)). @@ -103,34 +103,34 @@ name :: Name}). -type(listener() :: - #listener{node :: node(), - protocol :: atom(), - host :: rabbit_networking:hostname(), - port :: rabbit_networking:ip_port()}). + #listener{node :: node(), + protocol :: atom(), + host :: rabbit_networking:hostname(), + port :: rabbit_networking:ip_port()}). -type(binding_source() :: rabbit_exchange:name()). -type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()). -type(binding() :: - #binding{source :: rabbit_exchange:name(), - destination :: binding_destination(), - key :: rabbit_binding:key(), - args :: rabbit_framing:amqp_table()}). + #binding{source :: rabbit_exchange:name(), + destination :: binding_destination(), + key :: rabbit_binding:key(), + args :: rabbit_framing:amqp_table()}). -type(amqqueue() :: - #amqqueue{name :: rabbit_amqqueue:name(), - durable :: boolean(), - auto_delete :: boolean(), - exclusive_owner :: rabbit_types:maybe(pid()), - arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + #amqqueue{name :: rabbit_amqqueue:name(), + durable :: boolean(), + auto_delete :: boolean(), + exclusive_owner :: rabbit_types:maybe(pid()), + arguments :: rabbit_framing:amqp_table(), + pid :: rabbit_types:maybe(pid())}). -type(exchange() :: - #exchange{name :: rabbit_exchange:name(), - type :: rabbit_exchange:type(), - durable :: boolean(), - auto_delete :: boolean(), - arguments :: rabbit_framing:amqp_table()}). + #exchange{name :: rabbit_exchange:name(), + type :: rabbit_exchange:type(), + durable :: boolean(), + auto_delete :: boolean(), + arguments :: rabbit_framing:amqp_table()}). -type(connection() :: pid()). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b0a715233a..ebda5d03a7 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -98,7 +98,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. @@ -107,9 +106,9 @@ upgrades_to_apply(Heads, G) -> %% everything we've already applied. Subtract that from all %% vertices: that's what we have to apply. Unsorted = sets:to_list( - sets:subtract( - sets:from_list(digraph:vertices(G)), - sets:from_list(digraph_utils:reaching(Heads, G)))), + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), %% Form a subgraph from that list and find a topological ordering %% so we can invoke them in order. [element(2, digraph:vertex(G, StepName)) || diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e45..b9dbe418d9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({topic_trie, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). -endif. @@ -47,7 +49,7 @@ %% point. remove_user_scope() -> - mnesia( + transform( rabbit_user_permission, fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> {user_permission, UV, {permission, Conf, Write, Read}} @@ -55,7 +57,7 @@ remove_user_scope() -> [user_vhost, permission]). hash_passwords() -> - mnesia( + transform( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> Hash = rabbit_auth_backend_internal:hash_password(Password), @@ -64,7 +66,7 @@ hash_passwords() -> [username, password_hash, is_admin]). add_ip_to_listener() -> - mnesia( + transform( rabbit_listener, fun ({listener, Node, Protocol, Host, Port}) -> {listener, Node, Protocol, Host, {0,0,0,0}, Port} @@ -77,27 +79,41 @@ internal_exchanges() -> fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> {exchange, Name, Type, Durable, AutoDelete, false, Args} end, - [ ok = mnesia(T, - AddInternalFun, - [name, type, durable, auto_delete, internal, arguments]) + [ ok = transform(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) || T <- Tables ], ok. user_to_internal_user() -> - mnesia( + transform( rabbit_user, fun({user, Username, PasswordHash, IsAdmin}) -> {internal_user, Username, PasswordHash, IsAdmin} end, [username, password_hash, is_admin], internal_user). +topic_trie() -> + create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, + {attributes, [trie_edge, node_id]}, + {type, ordered_set}]), + create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, + {attributes, [trie_binding, value]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- -mnesia(TableName, Fun, FieldList) -> +transform(TableName, Fun, FieldList) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. -mnesia(TableName, Fun, FieldList, NewRecordName) -> +transform(TableName, Fun, FieldList, NewRecordName) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +create(Tab, TabDef) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7142d56072..591e5a6611 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1]). + status/1, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -268,13 +268,13 @@ msg_on_disk, index_on_disk, msg_props - }). + }). -record(delta, { start_seq_id, %% start_seq_id is inclusive count, end_seq_id %% end_seq_id is exclusive - }). + }). -record(tx, { pending_messages, pending_acks }). @@ -294,6 +294,8 @@ %%---------------------------------------------------------------------------- +-rabbit_upgrade({multiple_routing_keys, []}). + -ifdef(use_specs). -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). @@ -351,6 +353,8 @@ -include("rabbit_backing_queue_spec.hrl"). +-spec(multiple_routing_keys/0 :: () -> 'ok'). + -endif. -define(BLANK_DELTA, #delta { start_seq_id = undefined, @@ -506,8 +510,12 @@ publish(Msg, MsgProps, State) -> a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { guid = Guid }, - _MsgProps, State = #vqstate { len = 0 }) -> - blind_confirm(self(), gb_sets:singleton(Guid)), + #message_properties { needs_confirming = NeedsConfirming }, + State = #vqstate { len = 0 }) -> + case NeedsConfirming of + true -> blind_confirm(self(), gb_sets:singleton(Guid)); + false -> ok + end, {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, @@ -536,7 +544,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), - State1. + a(State1). dropwhile1(Pred, State) -> internal_queue_out( @@ -623,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 3. If an ack is required, add something sensible to PA {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, @@ -768,8 +776,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso - AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of + case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -1384,7 +1392,7 @@ accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, - {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {PersistentSeqIdsAcc, GuidsByStore}) -> @@ -1447,8 +1455,8 @@ msgs_written_to_disk(QPid, GuidSet, written) -> msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), State #vqstate { msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) + gb_sets:union( + MOD, gb_sets:intersection(UC, GuidSet)) }) end). msg_indices_written_to_disk(QPid, GuidSet) -> @@ -1459,8 +1467,8 @@ msg_indices_written_to_disk(QPid, GuidSet) -> msgs_confirmed(gb_sets:intersection(GuidSet, MOD), State #vqstate { msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) + gb_sets:union( + MIOD, gb_sets:intersection(UC, GuidSet)) }) end). %%---------------------------------------------------------------------------- @@ -1801,3 +1809,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) end. + +%%---------------------------------------------------------------------------- +%% Upgrading +%%---------------------------------------------------------------------------- + +multiple_routing_keys() -> + transform_storage( + fun ({basic_message, ExchangeName, Routing_Key, Content, + Guid, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + (_) -> {error, corrupt_message} + end), + ok. + + +%% Assumes message store is not running +transform_storage(TransformFun) -> + transform_store(?PERSISTENT_MSG_STORE, TransformFun), + transform_store(?TRANSIENT_MSG_STORE, TransformFun). + +transform_store(Store, TransformFun) -> + rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), + rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index efebef0699..24c130edd6 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -48,15 +48,15 @@ add(VHostPath) -> ok; (ok, false) -> [rabbit_exchange:declare( - rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, false, []) || - {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], ok end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index eba86a5551..ac3434d253 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -28,7 +28,7 @@ -define(HIBERNATE_AFTER, 5000). -%%---------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- -ifdef(use_specs). @@ -69,7 +69,7 @@ -endif. -%%---------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> {ok, @@ -133,7 +133,7 @@ handle_message({inet_reply, _, Status}, _State) -> handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- send_command(W, MethodRecord) -> W ! {send_command, MethodRecord}, @@ -157,13 +157,13 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- call(Pid, Msg) -> {ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity), Res. -%--------------------------------------------------------------------------- +%%--------------------------------------------------------------------------- assemble_frame(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 44e1e4b5ae..dcc6aff5c8 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({vm_memory_high_watermark, []}); + alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm(vm_memory_high_watermark); + alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); _ -> ok end, |
