summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl110
-rw-r--r--src/gen_server2.erl4
-rw-r--r--src/gm.erl1321
-rw-r--r--src/gm_soak_test.erl130
-rw-r--r--src/gm_tests.erl182
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_alarm.erl111
-rw-r--r--src/rabbit_amqqueue.erl50
-rw-r--r--src/rabbit_amqqueue_process.erl134
-rw-r--r--src/rabbit_auth_backend_internal.erl4
-rw-r--r--src/rabbit_auth_mechanism.erl4
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl7
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl5
-rw-r--r--src/rabbit_auth_mechanism_plain.erl5
-rw-r--r--src/rabbit_basic.erl71
-rw-r--r--src/rabbit_binary_generator.erl13
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl84
-rw-r--r--src/rabbit_channel_sup.erl12
-rw-r--r--src/rabbit_client_sup.erl4
-rw-r--r--src/rabbit_control.erl72
-rw-r--r--src/rabbit_direct.erl22
-rw-r--r--src/rabbit_event.erl19
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_topic.erl258
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_memory_monitor.erl10
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_mnesia.erl118
-rw-r--r--src/rabbit_msg_file.erl49
-rw-r--r--src/rabbit_msg_store.erl102
-rw-r--r--src/rabbit_multi.erl349
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_node_monitor.erl13
-rw-r--r--src/rabbit_prelaunch.erl14
-rw-r--r--src/rabbit_queue_index.erl28
-rw-r--r--src/rabbit_reader.erl41
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_router.erl21
-rw-r--r--src/rabbit_ssl.erl6
-rw-r--r--src/rabbit_tests.erl274
-rw-r--r--src/rabbit_types.erl96
-rw-r--r--src/rabbit_upgrade.erl7
-rw-r--r--src/rabbit_upgrade_functions.erl34
-rw-r--r--src/rabbit_variable_queue.erl70
-rw-r--r--src/rabbit_vhost.erl18
-rw-r--r--src/rabbit_writer.erl10
-rw-r--r--src/vm_memory_monitor.erl4
51 files changed, 2877 insertions, 1069 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 1e1f37cb3d..6f8241b3be 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0]).
+-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
+ info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -241,7 +242,7 @@
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
- val_or_error([char()] | binary()) | 'eof').
+ val_or_error([char()] | binary()) | 'eof').
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
@@ -251,7 +252,7 @@
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
- val_or_error(non_neg_integer())).
+ val_or_error(non_neg_integer())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
@@ -259,11 +260,17 @@
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
+-spec(info_keys/0 :: () -> [atom()]).
+-spec(info/0 :: () -> [{atom(), any()}]).
+-spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
-spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
-endif.
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [obtain_count, obtain_limit]).
+
+%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -494,6 +501,11 @@ set_limit(Limit) ->
get_limit() ->
gen_server:call(?SERVER, get_limit, infinity).
+info_keys() -> ?INFO_KEYS.
+
+info() -> info(?INFO_KEYS).
+info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+
%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------
@@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(obtain_count, #fhc_state{obtain_count = Count}) -> Count;
+i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(Item, _) -> throw({bad_argument, Item}).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -849,35 +867,41 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State1)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients })
- when Limit =/= infinity andalso Count >= Limit ->
- ok = track_client(Pid, Clients),
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
- {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
ok = track_client(Pid, Clients),
- case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- true ->
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply, reduce(State #fhc_state {
- obtain_pending = pending_in(Item, Pending) })};
- false ->
- {noreply, run_pending_item(Item, State)}
- end;
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Enqueue = fun () ->
+ true = ets:update_element(Clients, Pid,
+ {#cstate.blocked, true}),
+ State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) }
+ end,
+ {noreply,
+ case obtain_limit_reached(State) of
+ true -> Enqueue();
+ false -> case needs_reduce(State #fhc_state {
+ obtain_count = Count + 1 }) of
+ true -> reduce(Enqueue());
+ false -> adjust_alarm(
+ State, run_pending_item(Item, State))
+ end
+ end};
+
handle_call({set_limit, Limit}, _From, State) ->
- {reply, ok, maybe_reduce(
- process_pending(State #fhc_state {
- limit = Limit,
- obtain_limit = obtain_limit(Limit) }))};
+ {reply, ok, adjust_alarm(
+ State, maybe_reduce(
+ process_pending(
+ State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) })))};
+
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
- {reply, Limit, State}.
+ {reply, Limit, State};
+
+handle_call({info, Items}, _From, State) ->
+ {reply, infos(Items, State), State}.
handle_cast({register_callback, Pid, MFA},
State = #fhc_state { clients = Clients }) ->
@@ -900,9 +924,9 @@ handle_cast({close, Pid, EldestUnusedSince},
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
- {noreply, process_pending(
+ {noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 }))};
+ State #fhc_state { elders = Elders1 })))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -924,13 +948,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) })}.
+ {noreply, adjust_alarm(
+ State,
+ process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) }))}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
ets:delete(Clients),
@@ -990,6 +1016,18 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
+obtain_limit_reached(#fhc_state { obtain_limit = Limit,
+ obtain_count = Count}) ->
+ Limit =/= infinity andalso Count >= Limit.
+
+adjust_alarm(OldState, NewState) ->
+ case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
+ {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
+ {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ _ -> ok
+ end,
+ NewState.
+
requested({_Kind, _Pid, Requested, _From}) ->
Requested.
@@ -1094,7 +1132,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
case CStates of
[] -> ok;
_ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
AverageAge when AverageAge > 0 ->
notify_age(CStates, AverageAge);
_ ->
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 94296f9751..43e0a8f5df 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -453,8 +453,8 @@ unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
Pid;
-% Under R12 let's just ignore it, as we have a single term as Name.
-% On R13 it will never get here, as we get tuple with 'local/global' atom.
+%% Under R12 let's just ignore it, as we have a single term as Name.
+%% On R13 it will never get here, as we get tuple with 'local/global' atom.
unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
diff --git a/src/gm.erl b/src/gm.erl
new file mode 100644
index 0000000000..fd8d9b7792
--- /dev/null
+++ b/src/gm.erl
@@ -0,0 +1,1321 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm).
+
+%% Guaranteed Multicast
+%% ====================
+%%
+%% This module provides the ability to create named groups of
+%% processes to which members can be dynamically added and removed,
+%% and for messages to be broadcast within the group that are
+%% guaranteed to reach all members of the group during the lifetime of
+%% the message. The lifetime of a message is defined as being, at a
+%% minimum, the time from which the message is first sent to any
+%% member of the group, up until the time at which it is known by the
+%% member who published the message that the message has reached all
+%% group members.
+%%
+%% The guarantee given is that provided a message, once sent, makes it
+%% to members who do not all leave the group, the message will
+%% continue to propagate to all group members.
+%%
+%% Another way of stating the guarantee is that if member P publishes
+%% messages m and m', then for all members P', if P' is a member of
+%% the group prior to the publication of m, and P' receives m', then
+%% P' will receive m.
+%%
+%% Note that only local-ordering is enforced: i.e. if member P sends
+%% message m and then message m', then for-all members P', if P'
+%% receives m and m', then they will receive m' after m. Causality
+%% ordering is _not_ enforced. I.e. if member P receives message m
+%% and as a result publishes message m', there is no guarantee that
+%% other members P' will receive m before m'.
+%%
+%%
+%% API Use
+%% -------
+%%
+%% Mnesia must be started. Use the idempotent create_tables/0 function
+%% to create the tables required.
+%%
+%% start_link/3
+%% Provide the group name, the callback module name, and any arguments
+%% you wish to be passed into the callback module's functions. The
+%% joined/2 function will be called when we have joined the group,
+%% with the arguments passed to start_link and a list of the current
+%% members of the group. See the comments in behaviour_info/1 below
+%% for further details of the callback functions.
+%%
+%% leave/1
+%% Provide the Pid. Removes the Pid from the group. The callback
+%% terminate/2 function will be called.
+%%
+%% broadcast/2
+%% Provide the Pid and a Message. The message will be sent to all
+%% members of the group as per the guarantees given above. This is a
+%% cast and the function call will return immediately. There is no
+%% guarantee that the message will reach any member of the group.
+%%
+%% confirmed_broadcast/2
+%% Provide the Pid and a Message. As per broadcast/2 except that this
+%% is a call, not a cast, and only returns 'ok' once the Message has
+%% reached every member of the group. Do not call
+%% confirmed_broadcast/2 directly from the callback module otherwise
+%% you will deadlock the entire group.
+%%
+%% group_members/1
+%% Provide the Pid. Returns a list of the current group members.
+%%
+%%
+%% Implementation Overview
+%% -----------------------
+%%
+%% One possible means of implementation would be a fan-out from the
+%% sender to every member of the group. This would require that the
+%% group is fully connected, and, in the event that the original
+%% sender of the message disappears from the group before the message
+%% has made it to every member of the group, raises questions as to
+%% who is responsible for sending on the message to new group members.
+%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
+%% if the sender dies part way through, who is responsible for
+%% ensuring that the remaining Members receive the Msg? In the event
+%% that within the group, messages sent are broadcast from a subset of
+%% the members, the fan-out arrangement has the potential to
+%% substantially impact the CPU and network workload of such members,
+%% as such members would have to accommodate the cost of sending each
+%% message to every group member.
+%%
+%% Instead, if the members of the group are arranged in a chain, then
+%% it becomes easier to reason about who within the group has received
+%% each message and who has not. It eases issues of responsibility: in
+%% the event of a group member disappearing, the nearest upstream
+%% member of the chain is responsible for ensuring that messages
+%% continue to propagate down the chain. It also results in equal
+%% distribution of sending and receiving workload, even if all
+%% messages are being sent from just a single group member. This
+%% configuration has the further advantage that it is not necessary
+%% for every group member to know of every other group member, and
+%% even that a group member does not have to be accessible from all
+%% other group members.
+%%
+%% Performance is kept high by permitting pipelining and all
+%% communication between joined group members is asynchronous. In the
+%% chain A -> B -> C -> D, if A sends a message to the group, it will
+%% not directly contact C or D. However, it must know that D receives
+%% the message (in addition to B and C) before it can consider the
+%% message fully sent. A simplistic implementation would require that
+%% D replies to C, C replies to B and B then replies to A. This would
+%% result in a propagation delay of twice the length of the chain. It
+%% would also require, in the event of the failure of C, that D knows
+%% to directly contact B and issue the necessary replies. Instead, the
+%% chain forms a ring: D sends the message on to A: D does not
+%% distinguish A as the sender, merely as the next member (downstream)
+%% within the chain (which has now become a ring). When A receives
+%% from D messages that A sent, it knows that all members have
+%% received the message. However, the message is not dead yet: if C
+%% died as B was sending to C, then B would need to detect the death
+%% of C and forward the message on to D instead: thus every node has
+%% to remember every message published until it is told that it can
+%% forget about the message. This is essential not just for dealing
+%% with failure of members, but also for the addition of new members.
+%%
+%% Thus once A receives the message back again, it then sends to B an
+%% acknowledgement for the message, indicating that B can now forget
+%% about the message. B does so, and forwards the ack to C. C forgets
+%% the message, and forwards the ack to D, which forgets the message
+%% and finally forwards the ack back to A. At this point, A takes no
+%% further action: the message and its acknowledgement have made it to
+%% every member of the group. The message is now dead, and any new
+%% member joining the group at this point will not receive the
+%% message.
+%%
+%% We therefore have two roles:
+%%
+%% 1. The sender, who upon receiving their own messages back, must
+%% then send out acknowledgements, and upon receiving their own
+%% acknowledgements back perform no further action.
+%%
+%% 2. The other group members who upon receiving messages and
+%% acknowledgements must update their own internal state accordingly
+%% (the sending member must also do this in order to be able to
+%% accommodate failures), and forwards messages on to their downstream
+%% neighbours.
+%%
+%%
+%% Implementation: It gets trickier
+%% --------------------------------
+%%
+%% Chain A -> B -> C -> D
+%%
+%% A publishes a message which B receives. A now dies. B and D will
+%% detect the death of A, and will link up, thus the chain is now B ->
+%% C -> D. B forwards A's message on to C, who forwards it to D, who
+%% forwards it to B. Thus B is now responsible for A's messages - both
+%% publications and acknowledgements that were in flight at the point
+%% at which A died. Even worse is that this is transitive: after B
+%% forwards A's message to C, B dies as well. Now C is not only
+%% responsible for B's in-flight messages, but is also responsible for
+%% A's in-flight messages.
+%%
+%% Lemma 1: A member can only determine which dead members they have
+%% inherited responsibility for if there is a total ordering on the
+%% conflicting additions and subtractions of members from the group.
+%%
+%% Consider the simultaneous death of B and addition of B' that
+%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
+%% C is responsible for in-flight messages from B. It is easy to
+%% ensure that at least one of them thinks they have inherited B, but
+%% if we do not ensure that exactly one of them inherits B, then we
+%% could have B' converting publishes to acks, which then will crash C
+%% as C does not believe it has issued acks for those messages.
+%%
+%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
+%% becoming A -> C' -> E. Who has inherited which of B, C and D?
+%%
+%% However, for non-conflicting membership changes, only a partial
+%% ordering is required. For example, A -> B -> C becoming A -> A' ->
+%% B. The addition of A', between A and B can have no conflicts with
+%% the death of C: it is clear that A has inherited C's messages.
+%%
+%% For ease of implementation, we adopt the simple solution, of
+%% imposing a total order on all membership changes.
+%%
+%% On the death of a member, it is ensured the dead member's
+%% neighbours become aware of the death, and the upstream neighbour
+%% now sends to its new downstream neighbour its state, including the
+%% messages pending acknowledgement. The downstream neighbour can then
+%% use this to calculate which publishes and acknowledgements it has
+%% missed out on, due to the death of its old upstream. Thus the
+%% downstream can catch up, and continues the propagation of messages
+%% through the group.
+%%
+%% Lemma 2: When a member is joining, it must synchronously
+%% communicate with its upstream member in order to receive its
+%% starting state atomically with its addition to the group.
+%%
+%% New members must start with the same state as their nearest
+%% upstream neighbour. This ensures that it is not surprised by
+%% acknowledgements they are sent, and that should their downstream
+%% neighbour die, they are able to send the correct state to their new
+%% downstream neighbour to ensure it can catch up. Thus in the
+%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
+%% C, A' must start with the state of A, so that it can send C the
+%% correct state when B dies, allowing C to detect any missed
+%% messages.
+%%
+%% If A' starts by adding itself to the group membership, A could then
+%% die, without A' having received the necessary state from A. This
+%% would leave A' responsible for in-flight messages from A, but
+%% having the least knowledge of all, of those messages. Thus A' must
+%% start by synchronously calling A, which then immediately sends A'
+%% back its state. A then adds A' to the group. If A dies at this
+%% point then A' will be able to see this (as A' will fail to appear
+%% in the group membership), and thus A' will ignore the state it
+%% receives from A, and will simply repeat the process, trying to now
+%% join downstream from some other member. This ensures that should
+%% the upstream die as soon as the new member has been joined, the new
+%% member is guaranteed to receive the correct state, allowing it to
+%% correctly process messages inherited due to the death of its
+%% upstream neighbour.
+%%
+%% The canonical definition of the group membership is held by a
+%% distributed database. Whilst this allows the total ordering of
+%% changes to be achieved, it is nevertheless undesirable to have to
+%% query this database for the current view, upon receiving each
+%% message. Instead, we wish for members to be able to cache a view of
+%% the group membership, which then requires a cache invalidation
+%% mechanism. Each member maintains its own view of the group
+%% membership. Thus when the group's membership changes, members may
+%% need to become aware of such changes in order to be able to
+%% accurately process messages they receive. Because of the
+%% requirement of a total ordering of conflicting membership changes,
+%% it is not possible to use the guaranteed broadcast mechanism to
+%% communicate these changes: to achieve the necessary ordering, it
+%% would be necessary for such messages to be published by exactly one
+%% member, which can not be guaranteed given that such a member could
+%% die.
+%%
+%% The total ordering we enforce on membership changes gives rise to a
+%% view version number: every change to the membership creates a
+%% different view, and the total ordering permits a simple
+%% monotonically increasing view version number.
+%%
+%% Lemma 3: If a message is sent from a member that holds view version
+%% N, it can be correctly processed by any member receiving the
+%% message with a view version >= N.
+%%
+%% Initially, let us suppose that each view contains the ordering of
+%% every member that was ever part of the group. Dead members are
+%% marked as such. Thus we have a ring of members, some of which are
+%% dead, and are thus inherited by the nearest alive downstream
+%% member.
+%%
+%% In the chain A -> B -> C, all three members initially have view
+%% version 1, which reflects reality. B publishes a message, which is
+%% forward by C to A. B now dies, which A notices very quickly. Thus A
+%% updates the view, creating version 2. It now forwards B's
+%% publication, sending that message to its new downstream neighbour,
+%% C. This happens before C is aware of the death of B. C must become
+%% aware of the view change before it interprets the message its
+%% received, otherwise it will fail to learn of the death of B, and
+%% thus will not realise it has inherited B's messages (and will
+%% likely crash).
+%%
+%% Thus very simply, we have that each subsequent view contains more
+%% information than the preceding view.
+%%
+%% However, to avoid the views growing indefinitely, we need to be
+%% able to delete members which have died _and_ for which no messages
+%% are in-flight. This requires that upon inheriting a dead member, we
+%% know the last publication sent by the dead member (this is easy: we
+%% inherit a member because we are the nearest downstream member which
+%% implies that we know at least as much than everyone else about the
+%% publications of the dead member), and we know the earliest message
+%% for which the acknowledgement is still in flight.
+%%
+%% In the chain A -> B -> C, when B dies, A will send to C its state
+%% (as C is the new downstream from A), allowing C to calculate which
+%% messages it has missed out on (described above). At this point, C
+%% also inherits B's messages. If that state from A also includes the
+%% last message published by B for which an acknowledgement has been
+%% seen, then C knows exactly which further acknowledgements it must
+%% receive (also including issuing acknowledgements for publications
+%% still in-flight that it receives), after which it is known there
+%% are no more messages in flight for B, thus all evidence that B was
+%% ever part of the group can be safely removed from the canonical
+%% group membership.
+%%
+%% Thus, for every message that a member sends, it includes with that
+%% message its view version. When a member receives a message it will
+%% update its view from the canonical copy, should its view be older
+%% than the view version included in the message it has received.
+%%
+%% The state held by each member therefore includes the messages from
+%% each publisher pending acknowledgement, the last publication seen
+%% from that publisher, and the last acknowledgement from that
+%% publisher. In the case of the member's own publications or
+%% inherited members, this last acknowledgement seen state indicates
+%% the last acknowledgement retired, rather than sent.
+%%
+%%
+%% Proof sketch
+%% ------------
+%%
+%% We need to prove that with the provided operational semantics, we
+%% can never reach a state that is not well formed from a well-formed
+%% starting state.
+%%
+%% Operational semantics (small step): straight-forward message
+%% sending, process monitoring, state updates.
+%%
+%% Well formed state: dead members inherited by exactly one non-dead
+%% member; for every entry in anyone's pending-acks, either (the
+%% publication of the message is in-flight downstream from the member
+%% and upstream from the publisher) or (the acknowledgement of the
+%% message is in-flight downstream from the publisher and upstream
+%% from the member).
+%%
+%% Proof by induction on the applicable operational semantics.
+%%
+%%
+%% Related work
+%% ------------
+%%
+%% The ring configuration and double traversal of messages around the
+%% ring is similar (though developed independently) to the LCR
+%% protocol by [Levy 2008]. However, LCR differs in several
+%% ways. Firstly, by using vector clocks, it enforces a total order of
+%% message delivery, which is unnecessary for our purposes. More
+%% significantly, it is built on top of a "group communication system"
+%% which performs the group management functions, taking
+%% responsibility away from the protocol as to how to cope with safely
+%% adding and removing members. When membership changes do occur, the
+%% protocol stipulates that every member must perform communication
+%% with every other member of the group, to ensure all outstanding
+%% deliveries complete, before the entire group transitions to the new
+%% view. This, in total, requires two sets of all-to-all synchronous
+%% communications.
+%%
+%% This is not only rather inefficient, but also does not explain what
+%% happens upon the failure of a member during this process. It does
+%% though entirely avoid the need for inheritance of responsibility of
+%% dead members that our protocol incorporates.
+%%
+%% In [Marandi et al 2010], a Paxos-based protocol is described. This
+%% work explicitly focuses on the efficiency of communication. LCR
+%% (and our protocol too) are more efficient, but at the cost of
+%% higher latency. The Ring-Paxos protocol is itself built on top of
+%% IP-multicast, which rules it out for many applications where
+%% point-to-point communication is all that can be required. They also
+%% have an excellent related work section which I really ought to
+%% read...
+%%
+%%
+%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
+%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
+%% Protocol
+
+
+-behaviour(gen_server2).
+
+-export([create_tables/0, start_link/3, leave/1, broadcast/2,
+ confirmed_broadcast/2, group_members/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3, prioritise_info/2]).
+
+-export([behaviour_info/1]).
+
+-export([table_definitions/0]).
+
+-define(GROUP_TABLE, gm_group).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(SETS, ordsets).
+-define(DICT, orddict).
+
+-record(state,
+ { self,
+ left,
+ right,
+ group_name,
+ module,
+ view,
+ pub_count,
+ members_state,
+ callback_args,
+ confirms
+ }).
+
+-record(gm_group, { name, version, members }).
+
+-record(view_member, { id, aliases, left, right }).
+
+-record(member, { pending_ack, last_pub, last_ack }).
+
+-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
+ {attributes, record_info(fields, gm_group)}]}).
+-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
+
+-define(TAG, '$gm').
+
+-ifdef(use_specs).
+
+-export_type([group_name/0]).
+
+-type(group_name() :: any()).
+
+-spec(create_tables/0 :: () -> 'ok').
+-spec(start_link/3 :: (group_name(), atom(), any()) ->
+ {'ok', pid()} | {'error', any()}).
+-spec(leave/1 :: (pid()) -> 'ok').
+-spec(broadcast/2 :: (pid(), any()) -> 'ok').
+-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
+-spec(group_members/1 :: (pid()) -> [pid()]).
+
+-endif.
+
+behaviour_info(callbacks) ->
+ [
+ %% The joined, members_changed and handle_msg callbacks can all
+ %% return any of the following terms:
+ %%
+ %% 'ok' - the callback function returns normally
+ %%
+ %% {'stop', Reason} - the callback indicates the member should
+ %% stop with reason Reason and should leave the group.
+ %%
+ %% {'become', Module, Args} - the callback indicates that the
+ %% callback module should be changed to Module and that the
+ %% callback functions should now be passed the arguments
+ %% Args. This allows the callback module to be dynamically
+ %% changed.
+
+ %% Called when we've successfully joined the group. Supplied with
+ %% Args provided in start_link, plus current group members.
+ {joined, 2},
+
+ %% Supplied with Args provided in start_link, the list of new
+ %% members and the list of members previously known to us that
+ %% have since died. Note that if a member joins and dies very
+ %% quickly, it's possible that we will never see that member
+ %% appear in either births or deaths. However we are guaranteed
+ %% that (1) we will see a member joining either in the births
+ %% here, or in the members passed to joined/2 before receiving
+ %% any messages from it; and (2) we will not see members die that
+ %% we have not seen born (or supplied in the members to
+ %% joined/2).
+ {members_changed, 3},
+
+ %% Supplied with Args provided in start_link, the sender, and the
+ %% message. This does get called for messages injected by this
+ %% member, however, in such cases, there is no special
+ %% significance of this invocation: it does not indicate that the
+ %% message has made it to any other members, let alone all other
+ %% members.
+ {handle_msg, 3},
+
+ %% Called on gm member termination as per rules in gen_server,
+ %% with the Args provided in start_link plus the termination
+ %% Reason.
+ {terminate, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+create_tables() ->
+ create_tables([?TABLE]).
+
+create_tables([]) ->
+ ok;
+create_tables([{Table, Attributes} | Tables]) ->
+ case mnesia:create_table(Table, Attributes) of
+ {atomic, ok} -> create_tables(Tables);
+ {aborted, {already_exists, gm_group}} -> create_tables(Tables);
+ Err -> Err
+ end.
+
+table_definitions() ->
+ {Name, Attributes} = ?TABLE,
+ [{Name, [?TABLE_MATCH | Attributes]}].
+
+start_link(GroupName, Module, Args) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+
+leave(Server) ->
+ gen_server2:cast(Server, leave).
+
+broadcast(Server, Msg) ->
+ gen_server2:cast(Server, {broadcast, Msg}).
+
+confirmed_broadcast(Server, Msg) ->
+ gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
+
+group_members(Server) ->
+ gen_server2:call(Server, group_members, infinity).
+
+
+init([GroupName, Module, Args]) ->
+ random:seed(now()),
+ gen_server2:cast(self(), join),
+ Self = self(),
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = 0,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new() }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+
+handle_call({confirmed_broadcast, _Msg}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
+handle_call({confirmed_broadcast, Msg}, _From,
+ State = #state { self = Self,
+ right = {Self, undefined},
+ module = Module,
+ callback_args = Args }) ->
+ handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+
+handle_call({confirmed_broadcast, Msg}, From, State) ->
+ internal_broadcast(Msg, From, State);
+
+handle_call(group_members, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
+handle_call(group_members, _From, State = #state { view = View }) ->
+ reply(alive_view_members(View), State);
+
+handle_call({add_on_right, _NewMember}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_ready, State);
+
+handle_call({add_on_right, NewMember}, _From,
+ State = #state { self = Self,
+ group_name = GroupName,
+ view = View,
+ members_state = MembersState,
+ module = Module,
+ callback_args = Args }) ->
+ Group = record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(
+ MembersState)})
+ end),
+ View2 = group_to_view(Group),
+ State1 = check_neighbours(State #state { view = View2 }),
+ Result = callback_view_changed(Args, Module, View, View2),
+ handle_callback_result({Result, {ok, Group}, State1}).
+
+
+handle_cast({?TAG, ReqVer, Msg},
+ State = #state { view = View,
+ group_name = GroupName,
+ module = Module,
+ callback_args = Args }) ->
+ {Result, State1} =
+ case needs_view_update(ReqVer, View) of
+ true ->
+ View1 = group_to_view(read_group(GroupName)),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(State #state { view = View1 })};
+ false ->
+ {ok, State}
+ end,
+ handle_callback_result(
+ if_callback_success(
+ Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+
+handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+ noreply(State);
+
+handle_cast({broadcast, Msg},
+ State = #state { self = Self,
+ right = {Self, undefined},
+ module = Module,
+ callback_args = Args }) ->
+ handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+
+handle_cast({broadcast, Msg}, State) ->
+ internal_broadcast(Msg, none, State);
+
+handle_cast(join, State = #state { self = Self,
+ group_name = GroupName,
+ members_state = undefined,
+ module = Module,
+ callback_args = Args }) ->
+ View = join_group(Self, GroupName),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, all_known_members(View)), State1});
+
+handle_cast(leave, State) ->
+ {stop, normal, State}.
+
+
+handle_info({'DOWN', MRef, process, _Pid, _Reason},
+ State = #state { self = Self,
+ left = Left,
+ right = Right,
+ group_name = GroupName,
+ view = View,
+ module = Module,
+ callback_args = Args,
+ confirms = Confirms }) ->
+ Member = case {Left, Right} of
+ {{Member1, MRef}, _} -> Member1;
+ {_, {Member1, MRef}} -> Member1;
+ _ -> undefined
+ end,
+ case Member of
+ undefined ->
+ noreply(State);
+ _ ->
+ View1 =
+ group_to_view(record_dead_member_in_group(Member, GroupName)),
+ State1 = State #state { view = View1 },
+ {Result, State2} =
+ case alive_view_members(View1) of
+ [Self] ->
+ maybe_erase_aliases(
+ State1 #state {
+ members_state = blank_member_state(),
+ confirms = purge_confirms(Confirms) });
+ _ ->
+ %% here we won't be pointing out any deaths:
+ %% the concern is that there maybe births
+ %% which we'd otherwise miss.
+ {callback_view_changed(Args, Module, View, View1),
+ State1}
+ end,
+ handle_callback_result({Result, check_neighbours(State2)})
+ end.
+
+
+terminate(Reason, #state { module = Module,
+ callback_args = Args }) ->
+ Module:terminate(Args, Reason).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
+prioritise_info(_ , _State) -> 0.
+
+
+handle_msg(check_neighbours, State) ->
+ %% no-op - it's already been done by the calling handle_cast
+ {ok, State};
+
+handle_msg({catchup, Left, MembersStateLeft},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ right = {Right, _MRefR},
+ view = View,
+ members_state = undefined }) ->
+ ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ {ok, State #state { members_state = MembersStateLeft1 }};
+
+handle_msg({catchup, Left, MembersStateLeft},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ view = View,
+ members_state = MembersState })
+ when MembersState =/= undefined ->
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
+ ?DICT:fetch_keys(MembersStateLeft1)),
+ {MembersState1, Activity} =
+ lists:foldl(
+ fun (Id, MembersStateActivity) ->
+ #member { pending_ack = PALeft, last_ack = LA } =
+ find_member_or_blank(Id, MembersStateLeft1),
+ with_member_acc(
+ fun (#member { pending_ack = PA } = Member, Activity1) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {_AcksInFlight, Pubs, _PA1} =
+ find_prefix_common_suffix(PALeft, PA),
+ {Member #member { last_ack = LA },
+ activity_cons(Id, pubs_from_queue(Pubs),
+ [], Activity1)};
+ false ->
+ {Acks, _Common, Pubs} =
+ find_prefix_common_suffix(PA, PALeft),
+ {Member,
+ activity_cons(Id, pubs_from_queue(Pubs),
+ acks_from_queue(Acks),
+ Activity1)}
+ end
+ end, Id, MembersStateActivity)
+ end, {MembersState, activity_nil()}, AllMembers),
+ handle_msg({activity, Left, activity_finalise(Activity)},
+ State #state { members_state = MembersState1 });
+
+handle_msg({catchup, _NotLeft, _MembersState}, State) ->
+ {ok, State};
+
+handle_msg({activity, Left, Activity},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ view = View,
+ members_state = MembersState,
+ confirms = Confirms })
+ when MembersState =/= undefined ->
+ {MembersState1, {Confirms1, Activity1}} =
+ lists:foldl(
+ fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
+ with_member_acc(
+ fun (Member = #member { pending_ack = PA,
+ last_pub = LP,
+ last_ack = LA },
+ {Confirms2, Activity2}) ->
+ case is_member_alias(Id, Self, View) of
+ true ->
+ {ToAck, PA1} =
+ find_common(queue_from_pubs(Pubs), PA,
+ queue:new()),
+ LA1 = last_ack(Acks, LA),
+ AckNums = acks_from_queue(ToAck),
+ Confirms3 = maybe_confirm(
+ Self, Id, Confirms2, AckNums),
+ {Member #member { pending_ack = PA1,
+ last_ack = LA1 },
+ {Confirms3,
+ activity_cons(
+ Id, [], AckNums, Activity2)}};
+ false ->
+ PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
+ LA1 = last_ack(Acks, LA),
+ LP1 = last_pub(Pubs, LP),
+ {Member #member { pending_ack = PA1,
+ last_pub = LP1,
+ last_ack = LA1 },
+ {Confirms2,
+ activity_cons(Id, Pubs, Acks, Activity2)}}
+ end
+ end, Id, MembersStateConfirmsActivity)
+ end, {MembersState, {Confirms, activity_nil()}}, Activity),
+ State1 = State #state { members_state = MembersState1,
+ confirms = Confirms1 },
+ Activity3 = activity_finalise(Activity1),
+ {Result, State2} = maybe_erase_aliases(State1),
+ ok = maybe_send_activity(Activity3, State2),
+ if_callback_success(
+ Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
+
+handle_msg({activity, _NotLeft, _Activity}, State) ->
+ {ok, State}.
+
+
+noreply(State) ->
+ {noreply, State, hibernate}.
+
+reply(Reply, State) ->
+ {reply, Reply, State, hibernate}.
+
+internal_broadcast(Msg, From, State = #state { self = Self,
+ pub_count = PubCount,
+ members_state = MembersState,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args }) ->
+ PubMsg = {PubCount, Msg},
+ Activity = activity_cons(Self, [PubMsg], [], activity_nil()),
+ ok = maybe_send_activity(activity_finalise(Activity), State),
+ MembersState1 =
+ with_member(
+ fun (Member = #member { pending_ack = PA }) ->
+ Member #member { pending_ack = queue:in(PubMsg, PA) }
+ end, Self, MembersState),
+ Confirms1 = case From of
+ none -> Confirms;
+ _ -> queue:in({PubCount, From}, Confirms)
+ end,
+ handle_callback_result({Module:handle_msg(Args, Self, Msg),
+ State #state { pub_count = PubCount + 1,
+ members_state = MembersState1,
+ confirms = Confirms1 }}).
+
+
+%% ---------------------------------------------------------------------------
+%% View construction and inspection
+%% ---------------------------------------------------------------------------
+
+needs_view_update(ReqVer, {Ver, _View}) ->
+ Ver < ReqVer.
+
+view_version({Ver, _View}) ->
+ Ver.
+
+is_member_alive({dead, _Member}) -> false;
+is_member_alive(_) -> true.
+
+is_member_alias(Self, Self, _View) ->
+ true;
+is_member_alias(Member, Self, View) ->
+ ?SETS:is_element(Member,
+ ((fetch_view_member(Self, View)) #view_member.aliases)).
+
+dead_member_id({dead, Member}) -> Member.
+
+store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
+ {Ver, ?DICT:store(Id, VMember, View)}.
+
+with_view_member(Fun, View, Id) ->
+ store_view_member(Fun(fetch_view_member(Id, View)), View).
+
+fetch_view_member(Id, {_Ver, View}) ->
+ ?DICT:fetch(Id, View).
+
+find_view_member(Id, {_Ver, View}) ->
+ ?DICT:find(Id, View).
+
+blank_view(Ver) ->
+ {Ver, ?DICT:new()}.
+
+alive_view_members({_Ver, View}) ->
+ ?DICT:fetch_keys(View).
+
+all_known_members({_Ver, View}) ->
+ ?DICT:fold(
+ fun (Member, #view_member { aliases = Aliases }, Acc) ->
+ ?SETS:to_list(Aliases) ++ [Member | Acc]
+ end, [], View).
+
+group_to_view(#gm_group { members = Members, version = Ver }) ->
+ Alive = lists:filter(fun is_member_alive/1, Members),
+ [_|_] = Alive, %% ASSERTION - can't have all dead members
+ add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
+
+link_view([Left, Middle, Right | Rest], View) ->
+ case find_view_member(Middle, View) of
+ error ->
+ link_view(
+ [Middle, Right | Rest],
+ store_view_member(#view_member { id = Middle,
+ aliases = ?SETS:new(),
+ left = Left,
+ right = Right }, View));
+ {ok, _} ->
+ View
+ end;
+link_view(_, View) ->
+ View.
+
+add_aliases(View, Members) ->
+ Members1 = ensure_alive_suffix(Members),
+ {EmptyDeadSet, View1} =
+ lists:foldl(
+ fun (Member, {DeadAcc, ViewAcc}) ->
+ case is_member_alive(Member) of
+ true ->
+ {?SETS:new(),
+ with_view_member(
+ fun (VMember =
+ #view_member { aliases = Aliases }) ->
+ VMember #view_member {
+ aliases = ?SETS:union(Aliases, DeadAcc) }
+ end, ViewAcc, Member)};
+ false ->
+ {?SETS:add_element(dead_member_id(Member), DeadAcc),
+ ViewAcc}
+ end
+ end, {?SETS:new(), View}, Members1),
+ 0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
+ View1.
+
+ensure_alive_suffix(Members) ->
+ queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
+
+ensure_alive_suffix1(MembersQ) ->
+ {{value, Member}, MembersQ1} = queue:out_r(MembersQ),
+ case is_member_alive(Member) of
+ true -> MembersQ;
+ false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
+ end.
+
+
+%% ---------------------------------------------------------------------------
+%% View modification
+%% ---------------------------------------------------------------------------
+
+join_group(Self, GroupName) ->
+ join_group(Self, GroupName, read_group(GroupName)).
+
+join_group(Self, GroupName, {error, not_found}) ->
+ join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+ group_to_view(Group);
+join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+ case lists:member(Self, Members) of
+ true ->
+ group_to_view(Group);
+ false ->
+ case lists:filter(fun is_member_alive/1, Members) of
+ [] ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName));
+ Alive ->
+ Left = lists:nth(random:uniform(length(Alive)), Alive),
+ try
+ case gen_server2:call(
+ Left, {add_on_right, Self}, infinity) of
+ {ok, Group1} -> group_to_view(Group1);
+ not_ready -> join_group(Self, GroupName)
+ end
+ catch
+ exit:{R, _}
+ when R =:= noproc; R =:= normal; R =:= shutdown ->
+ join_group(
+ Self, GroupName,
+ record_dead_member_in_group(Left, GroupName))
+ end
+ end
+ end.
+
+read_group(GroupName) ->
+ case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
+ [] -> {error, not_found};
+ [Group] -> Group
+ end.
+
+prune_or_create_group(Self, GroupName) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () -> GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = 0 },
+ case mnesia:read(?GROUP_TABLE, GroupName) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
+ Group.
+
+record_dead_member_in_group(Member, GroupName) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ case lists:splitwith(
+ fun (Member1) -> Member1 =/= Member end, Members) of
+ {_Members1, []} -> %% not found - already recorded dead
+ Group1;
+ {Members1, [Member | Members2]} ->
+ Members3 = Members1 ++ [{dead, Member} | Members2],
+ Group2 = Group1 #gm_group { members = Members3,
+ version = Ver + 1 },
+ mnesia:write(Group2),
+ Group2
+ end
+ end),
+ Group.
+
+record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () ->
+ [#gm_group { members = Members, version = Ver } = Group1] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ Members1 = Prefix ++ [Left, NewMember | Suffix],
+ Group2 = Group1 #gm_group { members = Members1,
+ version = Ver + 1 },
+ ok = Fun(Group2),
+ mnesia:write(Group2),
+ Group2
+ end),
+ Group.
+
+erase_members_in_group(Members, GroupName) ->
+ DeadMembers = [{dead, Id} || Id <- Members],
+ {atomic, Group} =
+ mnesia:sync_transaction(
+ fun () ->
+ [Group1 = #gm_group { members = [_|_] = Members1,
+ version = Ver }] =
+ mnesia:read(?GROUP_TABLE, GroupName),
+ case Members1 -- DeadMembers of
+ Members1 -> Group1;
+ Members2 -> Group2 =
+ Group1 #gm_group { members = Members2,
+ version = Ver + 1 },
+ mnesia:write(Group2),
+ Group2
+ end
+ end),
+ Group.
+
+maybe_erase_aliases(State = #state { self = Self,
+ group_name = GroupName,
+ view = View,
+ members_state = MembersState,
+ module = Module,
+ callback_args = Args }) ->
+ #view_member { aliases = Aliases } = fetch_view_member(Self, View),
+ {Erasable, MembersState1}
+ = ?SETS:fold(
+ fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
+ #member { last_pub = LP, last_ack = LA } =
+ find_member_or_blank(Id, MembersState),
+ case can_erase_view_member(Self, Id, LA, LP) of
+ true -> {[Id | ErasableAcc],
+ erase_member(Id, MembersStateAcc)};
+ false -> Acc
+ end
+ end, {[], MembersState}, Aliases),
+ State1 = State #state { members_state = MembersState1 },
+ case Erasable of
+ [] -> {ok, State1};
+ _ -> View1 = group_to_view(
+ erase_members_in_group(Erasable, GroupName)),
+ {callback_view_changed(Args, Module, View, View1),
+ State1 #state { view = View1 }}
+ end.
+
+can_erase_view_member(Self, Self, _LA, _LP) -> false;
+can_erase_view_member(_Self, _Id, N, N) -> true;
+can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
+
+
+%% ---------------------------------------------------------------------------
+%% View monitoring and maintanence
+%% ---------------------------------------------------------------------------
+
+ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
+ {Self, undefined};
+ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
+ ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ {RealNeighbour, maybe_monitor(RealNeighbour, Self)};
+ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
+ {RealNeighbour, MRef};
+ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
+ true = erlang:demonitor(MRef),
+ Msg = {?TAG, Ver, check_neighbours},
+ ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = case Neighbour of
+ Self -> ok;
+ _ -> gen_server2:cast(Neighbour, Msg)
+ end,
+ {Neighbour, maybe_monitor(Neighbour, Self)}.
+
+maybe_monitor(Self, Self) ->
+ undefined;
+maybe_monitor(Other, _Self) ->
+ erlang:monitor(process, Other).
+
+check_neighbours(State = #state { self = Self,
+ left = Left,
+ right = Right,
+ view = View }) ->
+ #view_member { left = VLeft, right = VRight }
+ = fetch_view_member(Self, View),
+ Ver = view_version(View),
+ Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
+ Right1 = ensure_neighbour(Ver, Self, Right, VRight),
+ State1 = State #state { left = Left1, right = Right1 },
+ ok = maybe_send_catchup(Right, State1),
+ State1.
+
+maybe_send_catchup(Right, #state { right = Right }) ->
+ ok;
+maybe_send_catchup(_Right, #state { self = Self,
+ right = {Self, undefined} }) ->
+ ok;
+maybe_send_catchup(_Right, #state { members_state = undefined }) ->
+ ok;
+maybe_send_catchup(_Right, #state { self = Self,
+ right = {Right, _MRef},
+ view = View,
+ members_state = MembersState }) ->
+ send_right(Right, View,
+ {catchup, Self, prepare_members_state(MembersState)}).
+
+
+%% ---------------------------------------------------------------------------
+%% Catch_up delta detection
+%% ---------------------------------------------------------------------------
+
+find_prefix_common_suffix(A, B) ->
+ {Prefix, A1} = find_prefix(A, B, queue:new()),
+ {Common, Suffix} = find_common(A1, B, queue:new()),
+ {Prefix, Common, Suffix}.
+
+%% Returns the elements of A that occur before the first element of B,
+%% plus the remainder of A.
+find_prefix(A, B, Prefix) ->
+ case {queue:out(A), queue:out(B)} of
+ {{{value, Val}, _A1}, {{value, Val}, _B1}} ->
+ {Prefix, A};
+ {{empty, A1}, {{value, _A}, _B1}} ->
+ {Prefix, A1};
+ {{{value, {NumA, _MsgA} = Val}, A1},
+ {{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
+ find_prefix(A1, B, queue:in(Val, Prefix));
+ {_, {empty, _B1}} ->
+ {A, Prefix} %% Prefix well be empty here
+ end.
+
+%% A should be a prefix of B. Returns the commonality plus the
+%% remainder of B.
+find_common(A, B, Common) ->
+ case {queue:out(A), queue:out(B)} of
+ {{{value, Val}, A1}, {{value, Val}, B1}} ->
+ find_common(A1, B1, queue:in(Val, Common));
+ {{empty, _A}, _} ->
+ {Common, B}
+ end.
+
+
+%% ---------------------------------------------------------------------------
+%% Members helpers
+%% ---------------------------------------------------------------------------
+
+with_member(Fun, Id, MembersState) ->
+ store_member(
+ Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
+
+with_member_acc(Fun, Id, {MembersState, Acc}) ->
+ {MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
+ {store_member(Id, MemberState, MembersState), Acc1}.
+
+find_member_or_blank(Id, MembersState) ->
+ case ?DICT:find(Id, MembersState) of
+ {ok, Result} -> Result;
+ error -> blank_member()
+ end.
+
+erase_member(Id, MembersState) ->
+ ?DICT:erase(Id, MembersState).
+
+blank_member() ->
+ #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
+
+blank_member_state() ->
+ ?DICT:new().
+
+store_member(Id, MemberState, MembersState) ->
+ ?DICT:store(Id, MemberState, MembersState).
+
+prepare_members_state(MembersState) ->
+ ?DICT:to_list(MembersState).
+
+build_members_state(MembersStateList) ->
+ ?DICT:from_list(MembersStateList).
+
+
+%% ---------------------------------------------------------------------------
+%% Activity assembly
+%% ---------------------------------------------------------------------------
+
+activity_nil() ->
+ queue:new().
+
+activity_cons(_Id, [], [], Tail) ->
+ Tail;
+activity_cons(Sender, Pubs, Acks, Tail) ->
+ queue:in({Sender, Pubs, Acks}, Tail).
+
+activity_finalise(Activity) ->
+ queue:to_list(Activity).
+
+maybe_send_activity([], _State) ->
+ ok;
+maybe_send_activity(Activity, #state { self = Self,
+ right = {Right, _MRefR},
+ view = View }) ->
+ send_right(Right, View, {activity, Self, Activity}).
+
+send_right(Right, View, Msg) ->
+ ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+
+callback(Args, Module, Activity) ->
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, ok) ->
+ lists:foldl(fun ({_PubNum, Pub}, ok) ->
+ Module:handle_msg(Args, Id, Pub);
+ (_, Error) ->
+ Error
+ end, ok, Pubs);
+ (_, Error) ->
+ Error
+ end, ok, Activity).
+
+callback_view_changed(Args, Module, OldView, NewView) ->
+ OldMembers = all_known_members(OldView),
+ NewMembers = all_known_members(NewView),
+ Births = NewMembers -- OldMembers,
+ Deaths = OldMembers -- NewMembers,
+ case {Births, Deaths} of
+ {[], []} -> ok;
+ _ -> Module:members_changed(Args, Births, Deaths)
+ end.
+
+handle_callback_result({Result, State}) ->
+ if_callback_success(
+ Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
+handle_callback_result({Result, Reply, State}) ->
+ if_callback_success(
+ Result, fun reply_true/3, fun reply_false/3, Reply, State).
+
+no_reply_true (_Result, _Undefined, State) -> noreply(State).
+no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
+
+reply_true (_Result, Reply, State) -> reply(Reply, State).
+reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
+
+handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
+handle_msg_false(Result, _Msg, State) -> {Result, State}.
+
+activity_true(_Result, Activity, State = #state { module = Module,
+ callback_args = Args }) ->
+ {callback(Args, Module, Activity), State}.
+activity_false(Result, _Activity, State) ->
+ {Result, State}.
+
+if_callback_success(ok, True, _False, Arg, State) ->
+ True(ok, Arg, State);
+if_callback_success(
+ {become, Module, Args} = Result, True, _False, Arg, State) ->
+ True(Result, Arg, State #state { module = Module,
+ callback_args = Args });
+if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+ False(Result, Arg, State).
+
+maybe_confirm(_Self, _Id, Confirms, []) ->
+ Confirms;
+maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
+ case queue:out(Confirms) of
+ {empty, _Confirms} ->
+ Confirms;
+ {{value, {PubNum, From}}, Confirms1} ->
+ gen_server2:reply(From, ok),
+ maybe_confirm(Self, Self, Confirms1, PubNums);
+ {{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
+ maybe_confirm(Self, Self, Confirms, PubNums)
+ end;
+maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
+ Confirms.
+
+purge_confirms(Confirms) ->
+ [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
+ queue:new().
+
+
+%% ---------------------------------------------------------------------------
+%% Msg transformation
+%% ---------------------------------------------------------------------------
+
+acks_from_queue(Q) ->
+ [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+
+pubs_from_queue(Q) ->
+ queue:to_list(Q).
+
+queue_from_pubs(Pubs) ->
+ queue:from_list(Pubs).
+
+apply_acks([], Pubs) ->
+ Pubs;
+apply_acks(List, Pubs) ->
+ {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
+
+join_pubs(Q, []) -> Q;
+join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
+
+last_ack([], LA) ->
+ LA;
+last_ack(List, LA) ->
+ LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub([], LP) ->
+ LP;
+last_pub(List, LP) ->
+ {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
new file mode 100644
index 0000000000..1f8832a6b2
--- /dev/null
+++ b/src/gm_soak_test.erl
@@ -0,0 +1,130 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_soak_test).
+
+-export([test/0]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+%% ---------------------------------------------------------------------------
+%% Soak test
+%% ---------------------------------------------------------------------------
+
+get_state() ->
+ get(state).
+
+with_state(Fun) ->
+ put(state, Fun(get_state())).
+
+inc() ->
+ case 1 + get(count) of
+ 100000 -> Now = os:timestamp(),
+ Start = put(ts, Now),
+ Diff = timer:now_diff(Now, Start),
+ Rate = 100000 / (Diff / 1000000),
+ io:format("~p seeing ~p msgs/sec~n", [self(), Rate]),
+ put(count, 0);
+ N -> put(count, N)
+ end.
+
+joined([], Members) ->
+ io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
+ put(state, dict:from_list([{Member, empty} || Member <- Members])),
+ put(count, 0),
+ put(ts, os:timestamp()),
+ ok.
+
+members_changed([], Births, Deaths) ->
+ with_state(
+ fun (State) ->
+ State1 =
+ lists:foldl(
+ fun (Born, StateN) ->
+ false = dict:is_key(Born, StateN),
+ dict:store(Born, empty, StateN)
+ end, State, Births),
+ lists:foldl(
+ fun (Died, StateN) ->
+ true = dict:is_key(Died, StateN),
+ dict:store(Died, died, StateN)
+ end, State1, Deaths)
+ end),
+ ok.
+
+handle_msg([], From, {test_msg, Num}) ->
+ inc(),
+ with_state(
+ fun (State) ->
+ ok = case dict:find(From, State) of
+ {ok, died} ->
+ exit({{from, From},
+ {received_posthumous_delivery, Num}});
+ {ok, empty} -> ok;
+ {ok, Num} -> ok;
+ {ok, Num1} when Num < Num1 ->
+ exit({{from, From},
+ {duplicate_delivery_of, Num1},
+ {expecting, Num}});
+ {ok, Num1} ->
+ exit({{from, From},
+ {missing_delivery_of, Num},
+ {received_early, Num1}});
+ error ->
+ exit({{from, From},
+ {received_premature_delivery, Num}})
+ end,
+ dict:store(From, Num + 1, State)
+ end),
+ ok.
+
+terminate([], Reason) ->
+ io:format("Left ~p (~p)~n", [self(), Reason]),
+ ok.
+
+spawn_member() ->
+ spawn_link(
+ fun () ->
+ random:seed(now()),
+ %% start up delay of no more than 10 seconds
+ timer:sleep(random:uniform(10000)),
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, []),
+ Start = random:uniform(10000),
+ send_loop(Pid, Start, Start + random:uniform(10000)),
+ gm:leave(Pid),
+ spawn_more()
+ end).
+
+spawn_more() ->
+ [spawn_member() || _ <- lists:seq(1, 4 - random:uniform(4))].
+
+send_loop(_Pid, Target, Target) ->
+ ok;
+send_loop(Pid, Count, Target) when Target > Count ->
+ case random:uniform(3) of
+ 3 -> gm:confirmed_broadcast(Pid, {test_msg, Count});
+ _ -> gm:broadcast(Pid, {test_msg, Count})
+ end,
+ timer:sleep(random:uniform(5) - 1), %% sleep up to 4 ms
+ send_loop(Pid, Count + 1, Target).
+
+test() ->
+ ok = gm:create_tables(),
+ spawn_member(),
+ spawn_member().
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
new file mode 100644
index 0000000000..ca0ffd6483
--- /dev/null
+++ b/src/gm_tests.erl
@@ -0,0 +1,182 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(gm_tests).
+
+-export([test_join_leave/0,
+ test_broadcast/0,
+ test_confirmed_broadcast/0,
+ test_member_death/0,
+ test_receive_in_order/0,
+ all_tests/0]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
+
+-behaviour(gm).
+
+-include("gm_specs.hrl").
+
+-define(RECEIVE_OR_THROW(Body, Bool, Error),
+ receive Body ->
+ true = Bool,
+ passed
+ after 1000 ->
+ throw(Error)
+ end).
+
+joined(Pid, Members) ->
+ Pid ! {joined, self(), Members},
+ ok.
+
+members_changed(Pid, Births, Deaths) ->
+ Pid ! {members_changed, self(), Births, Deaths},
+ ok.
+
+handle_msg(Pid, From, Msg) ->
+ Pid ! {msg, self(), From, Msg},
+ ok.
+
+terminate(Pid, Reason) ->
+ Pid ! {termination, self(), Reason},
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% Functional tests
+%% ---------------------------------------------------------------------------
+
+all_tests() ->
+ passed = test_join_leave(),
+ passed = test_broadcast(),
+ passed = test_confirmed_broadcast(),
+ passed = test_member_death(),
+ passed = test_receive_in_order(),
+ passed.
+
+test_join_leave() ->
+ with_two_members(fun (_Pid, _Pid2) -> passed end).
+
+test_broadcast() ->
+ test_broadcast(fun gm:broadcast/2).
+
+test_confirmed_broadcast() ->
+ test_broadcast(fun gm:confirmed_broadcast/2).
+
+test_member_death() ->
+ with_two_members(
+ fun (Pid, Pid2) ->
+ {ok, Pid3} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid3, [Pid, Pid2, Pid3],
+ timeout_joining_gm_group_3),
+ passed = receive_birth(Pid, Pid3, timeout_waiting_for_birth_3_1),
+ passed = receive_birth(Pid2, Pid3, timeout_waiting_for_birth_3_2),
+
+ unlink(Pid3),
+ exit(Pid3, kill),
+
+ %% Have to do some broadcasts to ensure that all members
+ %% find out about the death.
+ passed = (test_broadcast_fun(fun gm:confirmed_broadcast/2))(
+ Pid, Pid2),
+
+ passed = receive_death(Pid, Pid3, timeout_waiting_for_death_3_1),
+ passed = receive_death(Pid2, Pid3, timeout_waiting_for_death_3_2),
+
+ passed
+ end).
+
+test_receive_in_order() ->
+ with_two_members(
+ fun (Pid, Pid2) ->
+ Numbers = lists:seq(1,1000),
+ [begin ok = gm:broadcast(Pid, N), ok = gm:broadcast(Pid2, N) end
+ || N <- Numbers],
+ passed = receive_numbers(
+ Pid, Pid, {timeout_for_msgs, Pid, Pid}, Numbers),
+ passed = receive_numbers(
+ Pid, Pid2, {timeout_for_msgs, Pid, Pid2}, Numbers),
+ passed = receive_numbers(
+ Pid2, Pid, {timeout_for_msgs, Pid2, Pid}, Numbers),
+ passed = receive_numbers(
+ Pid2, Pid2, {timeout_for_msgs, Pid2, Pid2}, Numbers),
+ passed
+ end).
+
+test_broadcast(Fun) ->
+ with_two_members(test_broadcast_fun(Fun)).
+
+test_broadcast_fun(Fun) ->
+ fun (Pid, Pid2) ->
+ ok = Fun(Pid, magic_message),
+ passed = receive_or_throw({msg, Pid, Pid, magic_message},
+ timeout_waiting_for_msg),
+ passed = receive_or_throw({msg, Pid2, Pid, magic_message},
+ timeout_waiting_for_msg)
+ end.
+
+with_two_members(Fun) ->
+ ok = gm:create_tables(),
+
+ {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid, [Pid], timeout_joining_gm_group_1),
+
+ {ok, Pid2} = gm:start_link(?MODULE, ?MODULE, self()),
+ passed = receive_joined(Pid2, [Pid, Pid2], timeout_joining_gm_group_2),
+ passed = receive_birth(Pid, Pid2, timeout_waiting_for_birth_2),
+
+ passed = Fun(Pid, Pid2),
+
+ ok = gm:leave(Pid),
+ passed = receive_death(Pid2, Pid, timeout_waiting_for_death_1),
+ passed =
+ receive_termination(Pid, normal, timeout_waiting_for_termination_1),
+
+ ok = gm:leave(Pid2),
+ passed =
+ receive_termination(Pid2, normal, timeout_waiting_for_termination_2),
+
+ receive X -> throw({unexpected_message, X})
+ after 0 -> passed
+ end.
+
+receive_or_throw(Pattern, Error) ->
+ ?RECEIVE_OR_THROW(Pattern, true, Error).
+
+receive_birth(From, Born, Error) ->
+ ?RECEIVE_OR_THROW({members_changed, From, Birth, Death},
+ ([Born] == Birth) andalso ([] == Death),
+ Error).
+
+receive_death(From, Died, Error) ->
+ ?RECEIVE_OR_THROW({members_changed, From, Birth, Death},
+ ([] == Birth) andalso ([Died] == Death),
+ Error).
+
+receive_joined(From, Members, Error) ->
+ ?RECEIVE_OR_THROW({joined, From, Members1},
+ lists:usort(Members) == lists:usort(Members1),
+ Error).
+
+receive_termination(From, Reason, Error) ->
+ ?RECEIVE_OR_THROW({termination, From, Reason1},
+ Reason == Reason1,
+ Error).
+
+receive_numbers(_Pid, _Sender, _Error, []) ->
+ passed;
+receive_numbers(Pid, Sender, Error, [N | Numbers]) ->
+ ?RECEIVE_OR_THROW({msg, Pid, Sender, M},
+ M == N,
+ Error),
+ receive_numbers(Pid, Sender, Error, Numbers).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index fd515747e5..c9c3a3a715 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -83,7 +83,7 @@ get_members(Name) ->
sync() ->
ensure_started(),
- gen_server:call(?MODULE, sync).
+ gen_server:call(?MODULE, sync, infinity).
%%%
%%% Callback functions from gen_server
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 1beed5c1a7..c9a929ae00 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -38,6 +38,7 @@
-rabbit_boot_step({database,
[{mfa, {rabbit_mnesia, init, []}},
+ {requires, file_handle_cache},
{enables, external_infrastructure}]}).
-rabbit_boot_step({file_handle_cache,
@@ -214,7 +215,8 @@ stop_and_halt() ->
ok.
status() ->
- [{running_applications, application:which_applications()}] ++
+ [{pid, list_to_integer(os:getpid())},
+ {running_applications, application:which_applications()}] ++
rabbit_mnesia:status().
rotate_logs(BinarySuffix) ->
@@ -373,7 +375,7 @@ config_files() ->
error -> []
end.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
print_banner() ->
{ok, Product} = application:get_key(id),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 37e40981a6..d38ecb91fe 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -18,12 +18,14 @@
-behaviour(gen_event).
--export([start/0, stop/0, register/2]).
+-export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]).
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
--record(alarms, {alertees, vm_memory_high_watermark = false}).
+-export([remote_conserve_memory/2]). %% Internal use only
+
+-record(alarms, {alertees, alarmed_nodes}).
%%----------------------------------------------------------------------------
@@ -33,6 +35,8 @@
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(on_node_up/1 :: (node()) -> 'ok').
+-spec(on_node_down/1 :: (node()) -> 'ok').
-endif.
@@ -56,39 +60,57 @@ register(Pid, HighMemMFA) ->
{register, Pid, HighMemMFA},
infinity).
+on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}).
+
+on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}).
+
+%% Can't use alarm_handler:{set,clear}_alarm because that doesn't
+%% permit notifying a remote node.
+remote_conserve_memory(Pid, true) ->
+ gen_event:notify({alarm_handler, node(Pid)},
+ {set_alarm, {{vm_memory_high_watermark, node()}, []}});
+remote_conserve_memory(Pid, false) ->
+ gen_event:notify({alarm_handler, node(Pid)},
+ {clear_alarm, {vm_memory_high_watermark, node()}}).
+
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #alarms{alertees = dict:new()}}.
+ {ok, #alarms{alertees = dict:new(),
+ alarmed_nodes = sets:new()}}.
-handle_call({register, Pid, {M, F, A} = HighMemMFA},
- State = #alarms{alertees = Alertess}) ->
- _MRef = erlang:monitor(process, Pid),
- ok = case State#alarms.vm_memory_high_watermark of
- true -> apply(M, F, A ++ [Pid, true]);
- false -> ok
- end,
- NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, State#alarms.vm_memory_high_watermark,
- State#alarms{alertees = NewAlertees}};
+handle_call({register, Pid, HighMemMFA}, State) ->
+ {ok, 0 < sets:size(State#alarms.alarmed_nodes),
+ internal_register(Pid, HighMemMFA, State)};
handle_call(_Request, State) ->
{ok, not_understood, State}.
-handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) ->
- ok = alert(true, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = true}};
+handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
+ {ok, maybe_alert(fun sets:add_element/2, Node, State)};
-handle_event({clear_alarm, vm_memory_high_watermark}, State) ->
- ok = alert(false, State#alarms.alertees),
- {ok, State#alarms{vm_memory_high_watermark = false}};
+handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
+ {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+
+handle_event({node_up, Node}, State) ->
+ %% Must do this via notify and not call to avoid possible deadlock.
+ ok = gen_event:notify(
+ {alarm_handler, Node},
+ {register, self(), {?MODULE, remote_conserve_memory, []}}),
+ {ok, State};
+
+handle_event({node_down, Node}, State) ->
+ {ok, maybe_alert(fun sets:del_element/2, Node, State)};
+
+handle_event({register, Pid, HighMemMFA}, State) ->
+ {ok, internal_register(Pid, HighMemMFA, State)};
handle_event(_Event, State) ->
{ok, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #alarms{alertees = Alertess}) ->
- {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}};
+ State = #alarms{alertees = Alertees}) ->
+ {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}};
handle_info(_Info, State) ->
{ok, State}.
@@ -100,10 +122,45 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
-alert(_Alert, undefined) ->
- ok;
-alert(Alert, Alertees) ->
- dict:fold(fun (Pid, {M, F, A}, Acc) ->
- ok = erlang:apply(M, F, A ++ [Pid, Alert]),
- Acc
+
+maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
+ alertees = Alertees}) ->
+ AN1 = SetFun(Node, AN),
+ BeforeSz = sets:size(AN),
+ AfterSz = sets:size(AN1),
+ %% If we have changed our alarm state, inform the remotes.
+ IsLocal = Node =:= node(),
+ if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
+ IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
+ true -> ok
+ end,
+ %% If the overall alarm state has changed, inform the locals.
+ case {BeforeSz, AfterSz} of
+ {0, 1} -> ok = alert_local(true, Alertees);
+ {1, 0} -> ok = alert_local(false, Alertees);
+ {_, _} -> ok
+ end,
+ State#alarms{alarmed_nodes = AN1}.
+
+alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
+
+alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
+
+alert(Alert, Alertees, NodeComparator) ->
+ Node = node(),
+ dict:fold(fun (Pid, {M, F, A}, ok) ->
+ case NodeComparator(Node, node(Pid)) of
+ true -> apply(M, F, A ++ [Pid, Alert]);
+ false -> ok
+ end
end, ok, Alertees).
+
+internal_register(Pid, {M, F, A} = HighMemMFA,
+ State = #alarms{alertees = Alertees}) ->
+ _MRef = erlang:monitor(process, Pid),
+ case sets:is_element(node(), State#alarms.alarmed_nodes) of
+ true -> ok = apply(M, F, A ++ [Pid, true]);
+ false -> ok
+ end,
+ NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
+ State#alarms{alertees = NewAlertees}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1c89539fc0..8e4ca8e3c5 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -52,7 +52,7 @@
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
- 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
@@ -100,13 +100,13 @@
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
- (rabbit_types:amqqueue(), 'false', 'false')
+ (rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
- (rabbit_types:amqqueue(), 'true' , 'false')
+ (rabbit_types:amqqueue(), 'true' , 'false')
-> qlen() | rabbit_types:error('in_use');
- (rabbit_types:amqqueue(), 'false', 'true' )
+ (rabbit_types:amqqueue(), 'false', 'true' )
-> qlen() | rabbit_types:error('not_empty');
- (rabbit_types:amqqueue(), 'true' , 'true' )
+ (rabbit_types:amqqueue(), 'true' , 'true' )
-> qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
@@ -122,10 +122,10 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
- {'ok', non_neg_integer(), qmsg()} | 'empty').
+ {'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
- rabbit_types:ctag(), boolean(), any())
+ (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
+ rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
arguments = Args,
exclusive_owner = Owner,
pid = none}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
end.
@@ -214,8 +214,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[] -> ok = store_queue(Q),
B = add_default_binding(Q),
fun (Tx) -> B(Tx), Q end;
- [_] -> %% Q exists on stopped node
- rabbit_misc:const(not_found)
+ %% Q exists on stopped node
+ [_] -> rabbit_misc:const(not_found)
end;
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
@@ -288,7 +288,7 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
- RequiredArgs) ->
+ RequiredArgs) ->
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>]).
@@ -324,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info, infinity).
+ delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -337,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers, infinity).
+ delegate_call(QPid, consumers).
consumers_all(VHostPath) ->
lists:append(
@@ -347,7 +347,7 @@ consumers_all(VHostPath) ->
end)).
stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat, infinity).
+ delegate_call(QPid, stat).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -356,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
@@ -370,7 +370,7 @@ deliver(QPid, Delivery) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
@@ -399,17 +399,15 @@ limit_all(QPids, ChPid, LimiterPid) ->
end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
@@ -500,8 +498,8 @@ safe_delegate_call_ok(F, Pids) ->
{_, Bad} -> {error, Bad}
end.
-delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_call(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e794b4aa1e..24de941595 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
-% Queue's state
+%% Queue's state
-record(q, {q,
exclusive_consumer,
has_had_consumers,
@@ -283,17 +283,16 @@ lookup_ch(ChPid) ->
ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
- undefined ->
- MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
- monitor_ref = MonitorRef,
- acktags = sets:new(),
- is_limit_active = false,
- txn = none,
- unsent_message_count = 0},
- put(Key, C),
- C;
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ C = #cr{consumer_count = 0,
+ ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = sets:new(),
+ is_limit_active = false,
+ txn = none,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
C = #cr{} -> C
end.
@@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-all_ch_record() ->
- [C || {{ch, _}, C} <- get()].
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
- BlockedOld = is_ch_blocked(OldCR),
- BlockedNew = is_ch_blocked(NewCR),
- if BlockedOld andalso not(BlockedNew) -> unblock;
- BlockedNew andalso not(BlockedOld) -> block;
- true -> ok
+ case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
+ {true, false} -> unblock;
+ {false, true} -> block;
+ {_, _} -> ok
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
@@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
BlockedConsumers};
- block ->
- {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
+ block -> {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
State2 = State1#q{
active_consumers = NewActiveConsumers,
@@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
- not IsEmpty.
+deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
@@ -405,32 +400,26 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
- {CMs, GTC1} =
- lists:foldl(
- fun(Guid, {CMs, GTC0}) ->
- case dict:find(Guid, GTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
- _ ->
- {CMs, GTC0}
- end
- end, {[], GTC}, Guids),
- case lists:usort(CMs) of
- [{Ch, MsgSeqNo} | CMs1] ->
- [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
- {ChPid, MsgSeqNos} <- group_confirms_by_channel(
- CMs1, [{Ch, [MsgSeqNo]}])];
- [] ->
- ok
- end,
+ {CMs, GTC1} = lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {gb_trees:empty(), GTC}, Guids),
+ gb_trees:map(fun(ChPid, MsgSeqNos) ->
+ rabbit_channel:confirm(ChPid, MsgSeqNos)
+ end, CMs),
State#q{guid_to_channel = GTC1}.
-group_confirms_by_channel([], Acc) ->
- Acc;
-group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
-group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
- group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
{no_confirm, State};
@@ -485,17 +474,14 @@ attempt_delivery(#delivery{txn = none,
{Delivered, State1} =
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
{Delivered, NeedsConfirming, State1};
-attempt_delivery(#delivery{txn = Txn,
+attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- {NeedsConfirming,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}}) ->
+ {NeedsConfirming, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
- {true,
- NeedsConfirming,
- State#q{backing_queue_state =
- BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
+ BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS),
+ {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
@@ -666,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
Now = now_micros(),
BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) ->
- Now > Expiry
- end, BQS),
+ fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ BQS),
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
@@ -727,10 +712,10 @@ i(Item, _) ->
consumers(#q{active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
emit_stats(State) ->
emit_stats(State, []).
@@ -752,7 +737,7 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
{channel, ChPid},
{queue, self()}]).
-%---------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
case Msg of
@@ -819,8 +804,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery},
- _From, State) ->
+handle_call({deliver_immediately, Delivery}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -911,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
case is_ch_blocked(C) of
true -> State1#q{
blocked_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.blocked_consumers)};
+ add_consumer(ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_message_queue(
State1#q{
active_consumers =
- add_consumer(
- ChPid, Consumer,
- State1#q.active_consumers)})
+ add_consumer(ChPid, Consumer,
+ State1#q.active_consumers)})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index a564480b0e..3d00584551 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -52,8 +52,8 @@
-spec(clear_admin/1 :: (rabbit_types:username()) -> 'ok').
-spec(list_users/0 :: () -> [{rabbit_types:username(), boolean()}]).
-spec(lookup_user/1 :: (rabbit_types:username())
- -> rabbit_types:ok(rabbit_types:internal_user())
- | rabbit_types:error('not_found')).
+ -> rabbit_types:ok(rabbit_types:internal_user())
+ | rabbit_types:error('not_found')).
-spec(set_permissions/5 ::(rabbit_types:username(), rabbit_types:vhost(),
regexp(), regexp(), regexp()) -> 'ok').
-spec(clear_permissions/2 :: (rabbit_types:username(), rabbit_types:vhost())
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 1d14f9f0b8..897199ee78 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -23,6 +23,10 @@ behaviour_info(callbacks) ->
%% A description.
{description, 0},
+ %% If this mechanism is enabled, should it be offered for a given socket?
+ %% (primarily so EXTERNAL can be SSL-only)
+ {should_offer, 1},
+
%% Called before authentication starts. Should create a state
%% object to be passed through all the stages of authentication.
{init, 1},
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 5e422eee89..b8682a465e 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_auth_mechanism).
--export([description/0, init/1, handle_response/2]).
+-export([description/0, should_offer/1, init/1, handle_response/2]).
-include("rabbit_auth_mechanism_spec.hrl").
@@ -38,6 +38,9 @@ description() ->
[{name, <<"AMQPLAIN">>},
{description, <<"QPid AMQPLAIN mechanism">>}].
+should_offer(_Sock) ->
+ true.
+
init(_Sock) ->
[].
@@ -51,5 +54,5 @@ handle_response(Response, _State) ->
_ ->
{protocol_error,
"AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
- [LoginTable]}
+ [LoginTable]}
end.
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index 7fd20f8b32..77aa34ea0a 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_auth_mechanism).
--export([description/0, init/1, handle_response/2]).
+-export([description/0, should_offer/1, init/1, handle_response/2]).
-include("rabbit_auth_mechanism_spec.hrl").
@@ -43,6 +43,9 @@ description() ->
{description, <<"RabbitMQ Demo challenge-response authentication "
"mechanism">>}].
+should_offer(_Sock) ->
+ true.
+
init(_Sock) ->
#state{}.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 1ca07018e4..e2f9bff9c5 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -19,7 +19,7 @@
-behaviour(rabbit_auth_mechanism).
--export([description/0, init/1, handle_response/2]).
+-export([description/0, should_offer/1, init/1, handle_response/2]).
-include("rabbit_auth_mechanism_spec.hrl").
@@ -41,6 +41,9 @@ description() ->
[{name, <<"PLAIN">>},
{description, <<"SASL PLAIN authentication mechanism">>}].
+should_offer(_Sock) ->
+ true.
+
init(_Sock) ->
[].
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index c5bd9575e3..f9a8ee1d50 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,10 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/5]).
+-export([publish/1, message/3, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
--export([is_message_persistent/1]).
%%----------------------------------------------------------------------------
@@ -41,8 +40,11 @@
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) ->
- (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) -> rabbit_types:message()).
+-spec(message/3 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ rabbit_types:decoded_content()) ->
+ rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
@@ -56,9 +58,6 @@
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
- (boolean() |
- {'invalid', non_neg_integer()})).
-endif.
@@ -98,19 +97,40 @@ from_content(Content) ->
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
-message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
+%% This breaks the spec rule forbidding message modification
+strip_header(#content{properties = #'P_basic'{headers = undefined}}
+ = DecodedContent, _Key) ->
+ DecodedContent;
+strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
+ = DecodedContent, Key) ->
+ case lists:keysearch(Key, 1, Headers) of
+ false -> DecodedContent;
+ {value, Found} -> Headers0 = lists:delete(Found, Headers),
+ rabbit_binary_generator:clear_encoded_content(
+ DecodedContent#content{
+ properties = Props#'P_basic'{
+ headers = Headers0}})
+ end.
+
+message(ExchangeName, RoutingKey,
+ #content{properties = Props} = DecodedContent) ->
+ try
+ {ok, #basic_message{
+ exchange_name = ExchangeName,
+ content = strip_header(DecodedContent, ?DELETED_HEADER),
+ guid = rabbit_guid:guid(),
+ is_persistent = is_message_persistent(DecodedContent),
+ routing_keys = [RoutingKey |
+ header_routes(Props#'P_basic'.headers)]}}
+ catch
+ {error, _Reason} = Error -> Error
+ end.
+
+message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
Content = build_content(Properties, BodyBin),
- case is_message_persistent(Content) of
- {invalid, Other} ->
- {error, {invalid_delivery_mode, Other}};
- IsPersistent when is_boolean(IsPersistent) ->
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKeyBin,
- content = Content,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent}
- end.
+ {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ Msg.
properties(P = #'P_basic'{}) ->
P;
@@ -152,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> {invalid, Other}
+ Other -> throw({error, {delivery_mode_unknown, Other}})
end.
+
+%% Extract CC routes from headers
+header_routes(undefined) ->
+ [];
+header_routes(HeadersTable) ->
+ lists:append(
+ [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of
+ {array, Routes} -> [Route || {longstr, Route} <- Routes];
+ undefined -> [];
+ {Type, _Val} -> throw({error, {unacceptable_type_in_header,
+ Type,
+ binary_to_list(HeaderKey)}})
+ end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index dc81ace6bf..68511a326c 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -18,12 +18,13 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
-% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
-% - 1 byte of frame type
-% - 2 bytes of channel number
-% - 4 bytes of frame payload length
-% - 1 byte of payload trailer FRAME_END byte
-% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
+%% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1
+%% - 1 byte of frame type
+%% - 2 bytes of channel number
+%% - 4 bytes of frame payload length
+%% - 1 byte of payload trailer FRAME_END byte
+%% See definition of check_empty_content_body_frame_size/0,
+%% an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
-export([build_simple_method_frame/3,
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 96a22dcaf1..7ddb781412 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -70,7 +70,7 @@
rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(has_for_source/1 :: (rabbit_types:binding_source()) -> boolean()).
-spec(remove_for_source/1 :: (rabbit_types:binding_source()) -> bindings()).
-spec(remove_for_destination/1 ::
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4ffd7822e..526fb42881 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -68,9 +68,9 @@
-type(channel_number() :: non_neg_integer()).
-spec(start_link/9 ::
- (channel_number(), pid(), pid(), rabbit_types:protocol(),
- rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+ (channel_number(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -109,7 +109,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
flush(Pid) ->
- gen_server2:call(Pid, flush).
+ gen_server2:call(Pid, flush, infinity).
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -254,7 +254,7 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
@@ -301,8 +301,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
{MXs, State2} = process_confirms(MsgSeqNos, QPid, State1),
erase_queue_stats(QPid),
State3 = (case Reason of
- normal -> fun record_confirms/2;
- _ -> fun send_nacks/2
+ normal -> fun record_confirms/2;
+ _ -> fun send_nacks/2
end)(MXs, State2),
noreply(queue_blocked(QPid, State3)).
@@ -593,32 +593,33 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
case ConfirmEnabled of
false -> {undefined, State};
true -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
- MsgSeqNo, Message, State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(
+ Exchange,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ ExchangeName, MsgSeqNo, Message,
+ State1),
+ maybe_incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State2),
+ {noreply, case TxnKey of
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
+ end};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid message: ~p", [Reason])
+ end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
@@ -658,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
@@ -714,9 +715,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
ok ->
{noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
+ dict:store(ActualConsumerTag,
+ QueueName,
+ ConsumerMapping)}};
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -738,8 +739,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
return_ok(State, NoWait, OkMsg);
{ok, QueueName} ->
NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
+ dict:erase(ConsumerTag,
+ ConsumerMapping)},
case rabbit_amqqueue:with(
QueueName,
fun (Q) ->
@@ -1123,7 +1124,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end.
basic_return(#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content},
#ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
@@ -1274,22 +1275,15 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
-
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_not_delivered, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 9cc407bc34..8175ad806c 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -68,12 +68,12 @@ start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
- SupPid,
- {channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid, Protocol,
- User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
+ SupPid,
+ {channel, {rabbit_channel, start_link,
+ [Channel, ClientChannelPid, ClientChannelPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dbdc6cd429..15e92542a2 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -29,9 +29,9 @@
-ifdef(use_specs).
-spec(start_link/1 :: (mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
- rabbit_types:ok_pid_or_error()).
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 3a18950f84..8364ecd8d7 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -20,6 +20,7 @@
-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
+-define(WAIT_FOR_VM_ATTEMPTS, 5).
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
@@ -102,24 +103,22 @@ print_badrpc_diagnostics(Node) ->
diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [
- {"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
- ].
+ [{"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
stop() ->
ok.
@@ -151,13 +150,13 @@ action(force_reset, Node, [], _Opts, Inform) ->
action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
+ [Node, ClusterNodes]),
rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
action(status, Node, [], _Opts, Inform) ->
@@ -293,13 +292,34 @@ action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost ~p", [VHost]),
display_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]})).
+ list_vhost_permissions, [VHost]}));
+
+action(wait, Node, [], _Opts, Inform) ->
+ Inform("Waiting for ~p", [Node]),
+ wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS).
+
+wait_for_application(Node, Attempts) ->
+ case rpc_call(Node, application, which_applications, [infinity]) of
+ {badrpc, _} = E -> NewAttempts = Attempts - 1,
+ case NewAttempts of
+ 0 -> E;
+ _ -> wait_for_application0(Node, NewAttempts)
+ end;
+ Apps -> case proplists:is_defined(rabbit, Apps) of
+ %% We've seen the node up; if it goes down
+ %% die immediately.
+ true -> ok;
+ false -> wait_for_application0(Node, 0)
+ end
+ end.
+
+wait_for_application0(Node, Attempts) ->
+ timer:sleep(1000),
+ wait_for_application(Node, Attempts).
default_if_empty(List, Default) when is_list(List) ->
- if List == [] ->
- Default;
- true ->
- [list_to_atom(X) || X <- List]
+ if List == [] -> Default;
+ true -> [list_to_atom(X) || X <- List]
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -390,7 +410,7 @@ prettify_typed_amqp_value(Type, Value) ->
_ -> Value
end.
-% the slower shutdown on windows required to flush stdout
+%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
{unix, _} ->
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 586563f61e..a2693c6919 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -26,8 +26,8 @@
-spec(boot/0 :: () -> 'ok').
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
- {'ok', {rabbit_types:user(),
- rabbit_framing:amqp_table()}}).
+ {'ok', {rabbit_types:user(),
+ rabbit_framing:amqp_table()}}).
-spec(start_channel/7 ::
(rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
@@ -40,12 +40,12 @@
boot() ->
{ok, _} =
supervisor2:start_child(
- rabbit_sup,
- {rabbit_direct_client_sup,
- {rabbit_client_sup, start_link,
- [{local, rabbit_direct_client_sup},
- {rabbit_channel_sup, start_link, []}]},
- transient, infinity, supervisor, [rabbit_client_sup]}),
+ rabbit_sup,
+ {rabbit_direct_client_sup,
+ {rabbit_client_sup, start_link,
+ [{local, rabbit_direct_client_sup},
+ {rabbit_channel_sup, start_link, []}]},
+ transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
%%----------------------------------------------------------------------------
@@ -73,7 +73,7 @@ start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
- rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, Protocol, User, VHost,
- Capabilities, Collector}]),
+ rabbit_direct_client_sup,
+ [{direct, Number, ClientChannelPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 40ade4b742..9ed532db2a 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -101,7 +101,7 @@ ensure_stats_timer(State = #state{level = none}, _Fun) ->
State;
ensure_stats_timer(State = #state{timer = undefined}, Fun) ->
{ok, TRef} = timer:apply_after(?STATS_INTERVAL,
- erlang, apply, [Fun, []]),
+ erlang, apply, [Fun, []]),
State#state{timer = TRef};
ensure_stats_timer(State, _Fun) ->
State.
@@ -130,15 +130,8 @@ notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
notify(Type, Props) ->
- try
- %% TODO: switch to os:timestamp() when we drop support for
- %% Erlang/OTP < R13B01
- gen_event:notify(rabbit_event, #event{type = Type,
- props = Props,
- timestamp = now()})
- catch error:badarg ->
- %% badarg means rabbit_event is no longer registered. We never
- %% unregister it so the great likelihood is that we're shutting
- %% down the broker but some events were backed up. Ignore it.
- ok
- end.
+ %% TODO: switch to os:timestamp() when we drop support for
+ %% Erlang/OTP < R13B01
+ gen_event:notify(rabbit_event, #event{type = Type,
+ props = Props,
+ timestamp = now()}).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 92259195c3..a463e57067 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -62,7 +62,7 @@
-> rabbit_types:infos()).
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 ::(rabbit_types:vhost(), rabbit_types:info_keys())
- -> [rabbit_types:infos()]).
+ -> [rabbit_types:infos()]).
-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> {rabbit_router:routing_result(), [pid()]}).
-spec(delete/2 ::
@@ -266,9 +266,9 @@ process_route(#resource{kind = queue} = QName,
call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
- [] -> {error, not_found};
- [X] -> Fun(X)
- end
+ [] -> {error, not_found};
+ [X] -> Fun(X)
+ end
end, PrePostCommitFun).
delete(XName, IfUnused) ->
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c51b0913a0..349c2f6ee4 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,8 +36,8 @@ description() ->
{description, <<"AMQP direct exchange, as per the AMQP specification">>}].
route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_routing_key(Name, RoutingKey).
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ rabbit_router:match_routing_key(Name, Routes).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 382fb6270d..bc5293c81d 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -36,7 +36,7 @@ description() ->
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
route(#exchange{name = Name}, _Delivery) ->
- rabbit_router:match_routing_key(Name, '_').
+ rabbit_router:match_routing_key(Name, ['_']).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 9cbf8100e2..f12661d48f 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_exchange_type_topic).
+
-include("rabbit.hrl").
-behaviour(rabbit_exchange_type).
@@ -31,58 +32,225 @@
{requires, rabbit_registry},
{enables, kernel_ready}]}).
--export([topic_matches/2]).
-
--ifdef(use_specs).
-
--spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
-
--endif.
+%%----------------------------------------------------------------------------
description() ->
[{name, <<"topic">>},
{description, <<"AMQP topic exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name},
- #delivery{message = #basic_message{routing_key = RoutingKey}}) ->
- rabbit_router:match_bindings(Name,
- fun (#binding{key = BindingKey}) ->
- topic_matches(BindingKey, RoutingKey)
- end).
-
-split_topic_key(Key) ->
- string:tokens(binary_to_list(Key), ".").
-
-topic_matches(PatternKey, RoutingKey) ->
- P = split_topic_key(PatternKey),
- R = split_topic_key(RoutingKey),
- topic_matches1(P, R).
-
-topic_matches1(["#"], _R) ->
- true;
-topic_matches1(["#" | PTail], R) ->
- last_topic_match(PTail, [], lists:reverse(R));
-topic_matches1([], []) ->
- true;
-topic_matches1(["*" | PatRest], [_ | ValRest]) ->
- topic_matches1(PatRest, ValRest);
-topic_matches1([PatElement | PatRest], [ValElement | ValRest])
- when PatElement == ValElement ->
- topic_matches1(PatRest, ValRest);
-topic_matches1(_, _) ->
- false.
-
-last_topic_match(P, R, []) ->
- topic_matches1(P, R);
-last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
- topic_matches1(P, R) or
- last_topic_match(P, [BacktrackNext | R], BacktrackList).
+%% NB: This may return duplicate results in some situations (that's ok)
+route(#exchange{name = X},
+ #delivery{message = #basic_message{routing_keys = Routes}}) ->
+ lists:append([begin
+ Words = split_topic_key(RKey),
+ mnesia:async_dirty(fun trie_match/2, [X, Words])
+ end || RKey <- Routes]).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
-delete(_Tx, _X, _Bs) -> ok.
-add_binding(_Tx, _X, _B) -> ok.
-remove_bindings(_Tx, _X, _Bs) -> ok.
+
+recover(_Exchange, Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ lists:foreach(fun (B) -> internal_add_binding(B) end, Bs)
+ end).
+
+delete(true, #exchange{name = X}, _Bs) ->
+ trie_remove_all_edges(X),
+ trie_remove_all_bindings(X),
+ ok;
+delete(false, _Exchange, _Bs) ->
+ ok.
+
+add_binding(true, _Exchange, Binding) ->
+ internal_add_binding(Binding);
+add_binding(false, _Exchange, _Binding) ->
+ ok.
+
+remove_bindings(true, _X, Bs) ->
+ lists:foreach(fun remove_binding/1, Bs),
+ ok;
+remove_bindings(false, _X, _Bs) ->
+ ok.
+
+remove_binding(#binding{source = X, key = K, destination = D}) ->
+ Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path),
+ ok.
+
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
+
+%%----------------------------------------------------------------------------
+
+internal_add_binding(#binding{source = X, key = K, destination = D}) ->
+ FinalNode = follow_down_create(X, split_topic_key(K)),
+ trie_add_binding(X, FinalNode, D),
+ ok.
+
+trie_match(X, Words) ->
+ trie_match(X, root, Words, []).
+
+trie_match(X, Node, [], ResAcc) ->
+ trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [],
+ trie_bindings(X, Node) ++ ResAcc);
+trie_match(X, Node, [W | RestW] = Words, ResAcc) ->
+ lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
+ trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc)
+ end, ResAcc, [{W, fun trie_match/4, RestW},
+ {"*", fun trie_match/4, RestW},
+ {"#", fun trie_match_skip_any/4, Words}]).
+
+trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) ->
+ case trie_child(X, Node, Search) of
+ {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc);
+ error -> ResAcc
+ end.
+
+trie_match_skip_any(X, Node, [], ResAcc) ->
+ trie_match(X, Node, [], ResAcc);
+trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) ->
+ trie_match_skip_any(X, Node, RestW,
+ trie_match(X, Node, Words, ResAcc)).
+
+follow_down_create(X, Words) ->
+ case follow_down_last_node(X, Words) of
+ {ok, FinalNode} -> FinalNode;
+ {error, Node, RestW} -> lists:foldl(
+ fun (W, CurNode) ->
+ NewNode = new_node_id(),
+ trie_add_edge(X, CurNode, NewNode, W),
+ NewNode
+ end, Node, RestW)
+ end.
+
+follow_down_last_node(X, Words) ->
+ follow_down(X, fun (_, Node, _) -> Node end, root, Words).
+
+follow_down_get_path(X, Words) ->
+ {ok, Path} =
+ follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end,
+ [{root, none}], Words),
+ Path.
+
+follow_down(X, AccFun, Acc0, Words) ->
+ follow_down(X, root, AccFun, Acc0, Words).
+
+follow_down(_X, _CurNode, _AccFun, Acc, []) ->
+ {ok, Acc};
+follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
+ case trie_child(X, CurNode, W) of
+ {ok, NextNode} -> follow_down(X, NextNode, AccFun,
+ AccFun(W, NextNode, Acc), RestW);
+ error -> {error, Acc, Words}
+ end.
+
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of
+ true -> ok;
+ false -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath)
+ end.
+
+trie_child(X, Node, Word) ->
+ case mnesia:read(rabbit_topic_trie_edge,
+ #trie_edge{exchange_name = X,
+ node_id = Node,
+ word = Word}) of
+ [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
+ [] -> error
+ end.
+
+trie_bindings(X, Node) ->
+ MatchHead = #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = '$1'}},
+ mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+
+trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
+
+trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
+
+trie_edge_op(X, FromNode, ToNode, W, Op) ->
+ ok = Op(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = FromNode,
+ word = W},
+ node_id = ToNode},
+ write).
+
+trie_add_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:write/3).
+
+trie_remove_binding(X, Node, D) ->
+ trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
+
+trie_binding_op(X, Node, D, Op) ->
+ ok = Op(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ destination = D}},
+ write).
+
+trie_has_any_children(X, Node) ->
+ has_any(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_has_any_bindings(X, Node) ->
+ has_any(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X,
+ node_id = Node,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_edges(X) ->
+ remove_all(rabbit_topic_trie_edge,
+ #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
+
+trie_remove_all_bindings(X) ->
+ remove_all(rabbit_topic_trie_binding,
+ #topic_trie_binding{
+ trie_binding = #trie_binding{exchange_name = X, _ = '_'},
+ _ = '_'}).
+
+has_any(Table, MatchHead) ->
+ Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read),
+ select_while_no_result(Select) /= '$end_of_table'.
+
+select_while_no_result({[], Cont}) ->
+ select_while_no_result(mnesia:select(Cont));
+select_while_no_result(Other) ->
+ Other.
+
+remove_all(Table, Pattern) ->
+ lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end,
+ mnesia:match_object(Table, Pattern, write)).
+
+new_node_id() ->
+ rabbit_guid:guid().
+
+split_topic_key(Key) ->
+ split_topic_key(Key, [], []).
+
+split_topic_key(<<>>, [], []) ->
+ [];
+split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
+ lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
+split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
+ split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
+
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282d9..1b72dd761a 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) ->
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:call(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 2f8c940b75..996b0a980f 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -111,11 +111,11 @@ stop() ->
init([]) ->
MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
- (try
- vm_memory_monitor:get_memory_limit()
- catch
- exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
- end)),
+ (try
+ vm_memory_monitor:get_memory_limit()
+ catch
+ exit:{noproc, _} -> ?MEMORY_SIZE_FOR_DISABLED_VMM
+ end)),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index abc27c5f7d..e79a58a1b6 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -105,7 +105,7 @@
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
- -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
+ -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -469,11 +469,11 @@ map_in_order(F, L) ->
table_fold(F, Acc0, TableName) ->
lists:foldl(
fun (E, Acc) -> execute_mnesia_transaction(
- fun () -> case mnesia:match_object(TableName, E, read) of
- [] -> Acc;
- _ -> F(E, Acc)
- end
- end)
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> Acc;
+ _ -> F(E, Acc)
+ end
+ end)
end, Acc0, dirty_read_all(TableName)).
dirty_read_all(TableName) ->
@@ -755,12 +755,12 @@ unlink_and_capture_exit(Pid) ->
after 0 -> ok
end.
-% Separate flags and options from arguments.
-% get_options([{flag, "-q"}, {option, "-p", "/"}],
-% ["set_permissions","-p","/","guest",
-% "-q",".*",".*",".*"])
-% == {["set_permissions","guest",".*",".*",".*"],
-% [{"-q",true},{"-p","/"}]}
+%% Separate flags and options from arguments.
+%% get_options([{flag, "-q"}, {option, "-p", "/"}],
+%% ["set_permissions","-p","/","guest",
+%% "-q",".*",".*",".*"])
+%% == {["set_permissions","guest",".*",".*",".*"],
+%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
{AsOut, Value} = case Def of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index a9b4e17745..66436920d4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -20,7 +20,7 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1]).
+ empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]).
-export([table_names/0]).
@@ -54,6 +54,7 @@
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
+-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
-endif.
@@ -128,10 +129,10 @@ empty_ram_only_tables() ->
Node = node(),
lists:foreach(
fun (TabName) ->
- case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
- true -> {atomic, ok} = mnesia:clear_table(TabName);
- false -> ok
- end
+ case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of
+ true -> {atomic, ok} = mnesia:clear_table(TabName);
+ false -> ok
+ end
end, table_names()),
ok.
@@ -185,6 +186,17 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_edge,
+ [{record_name, topic_trie_edge},
+ {attributes, record_info(fields, topic_trie_edge)},
+ {type, ordered_set},
+ {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]},
+ {rabbit_topic_trie_binding,
+ [{record_name, topic_trie_binding},
+ {attributes, record_info(fields, topic_trie_binding)},
+ {type, ordered_set},
+ {match, #topic_trie_binding{trie_binding = trie_binding_match(),
+ _='_'}}]},
%% Consider the implications to nodes_of_type/1 before altering
%% the next entry.
{rabbit_durable_exchange,
@@ -216,6 +228,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_edge_match() ->
+ #trie_edge{exchange_name = exchange_name_match(),
+ _='_'}.
+trie_binding_match() ->
+ #trie_binding{exchange_name = exchange_name_match(),
+ _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
@@ -264,45 +282,48 @@ ensure_schema_integrity() ->
check_schema_integrity() ->
Tables = mnesia:system_info(tables),
- case [Error || {Tab, TabDef} <- table_definitions(),
- case lists:member(Tab, Tables) of
- false ->
- Error = {table_missing, Tab},
- true;
- true ->
- {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
- Attrs = mnesia:table_info(Tab, attributes),
- Error = {table_attributes_mismatch, Tab,
- ExpAttrs, Attrs},
- Attrs /= ExpAttrs
- end] of
- [] -> check_table_integrity();
- Errors -> {error, Errors}
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
end.
-check_table_integrity() ->
- ok = wait_for_tables(),
- case lists:all(fun ({Tab, TabDef}) ->
- {_, Match} = proplists:lookup(match, TabDef),
- read_test_table(Tab, Match)
- end, table_definitions()) of
- true -> ok;
- false -> {error, invalid_table_content}
+check_table_attributes(Tab, TabDef) ->
+ {_, ExpAttrs} = proplists:lookup(attributes, TabDef),
+ case mnesia:table_info(Tab, attributes) of
+ ExpAttrs -> ok;
+ Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}}
end.
-read_test_table(Tab, Match) ->
+check_table_content(Tab, TabDef) ->
+ {_, Match} = proplists:lookup(match, TabDef),
case mnesia:dirty_first(Tab) of
'$end_of_table' ->
- true;
+ ok;
Key ->
ObjList = mnesia:dirty_read(Tab, Key),
MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]),
case ets:match_spec_run(ObjList, MatchComp) of
- ObjList -> true;
- _ -> false
+ ObjList -> ok;
+ _ -> {error, {table_content_invalid, Tab, Match, ObjList}}
end
end.
+check_tables(Fun) ->
+ case [Error || {Tab, TabDef} <- table_definitions(),
+ case Fun(Tab, TabDef) of
+ ok -> Error = none, false;
+ {error, Error} -> true
+ end] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
%% The cluster node config file contains some or all of the disk nodes
%% that are members of the cluster this node is / should be a part of.
%%
@@ -369,17 +390,15 @@ init_db(ClusterNodes, Force) ->
case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
- ok = wait_for_tables(),
case rabbit_upgrade:maybe_upgrade() of
- ok -> ensure_schema_ok();
+ ok -> ensure_schema_integrity();
version_not_available -> schema_ok_or_move()
end;
{[], true, _} ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
- ok = wait_for_tables(),
ensure_version_ok(rabbit_upgrade:read_version()),
- ensure_schema_ok();
+ ensure_schema_integrity();
{[], false, _} ->
%% Nothing there at all, start from scratch
ok = create_schema();
@@ -396,7 +415,7 @@ init_db(ClusterNodes, Force) ->
true -> disc;
false -> ram
end),
- ensure_schema_ok()
+ ensure_schema_integrity()
end;
{error, Reason} ->
%% one reason we may end up here is if we try to join
@@ -429,12 +448,6 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_upgrade:write_version().
-ensure_schema_ok() ->
- case check_schema_integrity() of
- ok -> ok;
- {error, Reason} -> throw({error, {schema_invalid, Reason}})
- end.
-
create_schema() ->
mnesia:stop(),
rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
@@ -443,7 +456,6 @@ create_schema() ->
cannot_start_mnesia),
ok = create_tables(),
ok = ensure_schema_integrity(),
- ok = wait_for_tables(),
ok = rabbit_upgrade:write_version().
move_db() ->
@@ -472,8 +484,7 @@ copy_db(Destination) ->
mnesia:stop(),
case rabbit_misc:recursive_copy(dir(), Destination) of
ok ->
- rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = wait_for_tables();
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia);
{error, E} ->
{error, E}
end.
@@ -508,13 +519,13 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%% unused code - commented out to keep dialyzer happy
-%% Type =:= disc_only ->
-%% if
-%% HasDiscCopies or HasDiscOnlyCopies ->
-%% disc_only_copies;
-%% true -> ram_copies
-%% end;
+%%% unused code - commented out to keep dialyzer happy
+%%% Type =:= disc_only ->
+%%% if
+%%% HasDiscCopies or HasDiscOnlyCopies ->
+%%% disc_only_copies;
+%%% true -> ram_copies
+%%% end;
Type =:= ram ->
ram_copies
end,
@@ -541,7 +552,8 @@ wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
case mnesia:wait_for_tables(TableNames, 30000) of
- ok -> ok;
+ ok ->
+ ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});
{error, Reason} ->
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index cfea4982b4..ea7cf80cc5 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -16,7 +16,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2]).
+-export([append/3, read/2, scan/4]).
%%----------------------------------------------------------------------------
@@ -45,9 +45,9 @@
-spec(read/2 :: (io_device(), msg_size()) ->
rabbit_types:ok_or_error2({rabbit_guid:guid(), msg()},
any())).
--spec(scan/2 :: (io_device(), file_size()) ->
- {'ok', [{rabbit_guid:guid(), msg_size(), position()}],
- position()}).
+-spec(scan/4 :: (io_device(), file_size(),
+ fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
+ A) -> {'ok', A, position()}).
-endif.
@@ -60,9 +60,9 @@ append(FileHdl, Guid, MsgBody)
Size = MsgBodyBinSize + ?GUID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:MsgBodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
+ Guid:?GUID_SIZE_BYTES/binary,
+ MsgBodyBin:MsgBodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
KO -> KO
end.
@@ -72,36 +72,36 @@ read(FileHdl, TotalSize) ->
BodyBinSize = Size - ?GUID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:BodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
+ Guid:?GUID_SIZE_BYTES/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {Guid, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
-scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0).
+scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, 0, Fun, Acc).
-scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset) ->
+scan(_FileHdl, FileSize, _Data, FileSize, ScanOffset, _Fun, Acc) ->
{ok, Acc, ScanOffset};
-scan(FileHdl, FileSize, Data, ReadOffset, Acc, ScanOffset) ->
+scan(FileHdl, FileSize, Data, ReadOffset, ScanOffset, Fun, Acc) ->
Read = lists:min([?SCAN_BLOCK_SIZE, (FileSize - ReadOffset)]),
case file_handle_cache:read(FileHdl, Read) of
{ok, Data1} ->
{Data2, Acc1, ScanOffset1} =
- scan(<<Data/binary, Data1/binary>>, Acc, ScanOffset),
+ scanner(<<Data/binary, Data1/binary>>, ScanOffset, Fun, Acc),
ReadOffset1 = ReadOffset + size(Data1),
- scan(FileHdl, FileSize, Data2, ReadOffset1, Acc1, ScanOffset1);
+ scan(FileHdl, FileSize, Data2, ReadOffset1, ScanOffset1, Fun, Acc1);
_KO ->
{ok, Acc, ScanOffset}
end.
-scan(<<>>, Acc, Offset) ->
+scanner(<<>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset};
-scan(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Acc, Offset) ->
+scanner(<<0:?INTEGER_SIZE_BITS, _Rest/binary>>, Offset, _Fun, Acc) ->
{<<>>, Acc, Offset}; %% Nothing to do other than stop.
-scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
- WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Acc, Offset) ->
+scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
+ WriteMarker:?WRITE_OK_SIZE_BITS, Rest/binary>>, Offset, Fun, Acc) ->
TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
case WriteMarker of
?WRITE_OK_MARKER ->
@@ -110,12 +110,13 @@ scan(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
%% which we read the Guid as a number, and then convert it
%% back to a binary in order to work around bugs in
%% Erlang's GC.
- <<GuidNum:?GUID_SIZE_BITS, _Msg/binary>> =
+ <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
<<GuidAndMsg:Size/binary>>,
<<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scan(Rest, [{Guid, TotalSize, Offset} | Acc], Offset + TotalSize);
+ scanner(Rest, Offset + TotalSize, Fun,
+ Fun({Guid, TotalSize, Offset, Msg}, Acc));
_ ->
- scan(Rest, Acc, Offset + TotalSize)
+ scanner(Rest, Offset + TotalSize, Fun, Acc)
end;
-scan(Data, Acc, Offset) ->
+scanner(Data, Offset, _Fun, Acc) ->
{Data, Acc, Offset}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7f3cf35faa..8e1b2ac482 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -26,6 +26,8 @@
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
+-export([transform_dir/3, force_recovery/2]). %% upgrade
+
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
@@ -33,9 +35,10 @@
-include("rabbit_msg_store.hrl").
--define(SYNC_INTERVAL, 25). %% milliseconds
+-define(SYNC_INTERVAL, 5). %% milliseconds
-define(CLEAN_FILENAME, "clean.dot").
-define(FILE_SUMMARY_FILENAME, "file_summary.ets").
+-define(TRANSFORM_TMP, "transform_tmp").
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -72,7 +75,7 @@
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
cref_to_guids %% client ref to synced messages mapping
- }).
+ }).
-record(client_msstate,
{ server,
@@ -86,7 +89,7 @@
file_summary_ets,
dedup_cache_ets,
cur_file_cache_ets
- }).
+ }).
-record(file_summary,
{file, valid_total_size, left, right, file_size, locked, readers}).
@@ -160,6 +163,9 @@
-spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) ->
deletion_thunk()).
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
+-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
+-spec(transform_dir/3 :: (file:filename(), server(),
+ fun ((any()) -> (rabbit_types:ok_or_error2(msg(), any())))) -> 'ok').
-endif.
@@ -543,7 +549,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% GC ends, we +1 readers, msg_store ets:deletes (and
%% unlocks the dest)
try Release(),
- Defer()
+ Defer()
catch error:badarg -> read(Guid, CState)
end;
[#file_summary { locked = false }] ->
@@ -661,7 +667,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
cref_to_guids = dict:new()
- },
+ },
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
@@ -1250,7 +1256,7 @@ safe_file_delete(File, Dir, FileHandlesEts) ->
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts,
client_ref = Ref } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}),
{ok, lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -1459,7 +1465,7 @@ recover_file_summary(true, Dir) ->
Path = filename:join(Dir, ?FILE_SUMMARY_FILENAME),
case ets:file2tab(Path) of
{ok, Tid} -> file:delete(Path),
- {true, Tid};
+ {true, Tid};
{error, _Error} -> recover_file_summary(false, Dir)
end.
@@ -1523,7 +1529,8 @@ scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
Hdl, filelib:file_size(
- form_filename(Dir, FileName))),
+ form_filename(Dir, FileName)),
+ fun scan_fun/2, []),
%% if something really bad has happened,
%% the close could fail, but ignore
file_handle_cache:close(Hdl),
@@ -1532,6 +1539,9 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
+scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
+ [{Guid, TotalSize, Offset} | Acc].
+
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1683,8 +1693,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
pending_gc_completion = Pending,
file_summary_ets = FileSummaryEts,
file_size_limit = FileSizeLimit })
- when (SumFileSize > 2 * FileSizeLimit andalso
- (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION) ->
+ when SumFileSize > 2 * FileSizeLimit andalso
+ (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
%% TODO: the algorithm here is sub-optimal - it may result in a
%% complete traversal of FileSummaryEts.
case ets:first(FileSummaryEts) of
@@ -1747,10 +1757,10 @@ delete_file_if_empty(File, State = #msstate {
locked = false }] =
ets:lookup(FileSummaryEts, File),
case ValidData of
- 0 -> %% don't delete the file_summary_ets entry for File here
- %% because we could have readers which need to be able to
- %% decrement the readers count.
- true = ets:update_element(FileSummaryEts, File,
+ %% don't delete the file_summary_ets entry for File here
+ %% because we could have readers which need to be able to
+ %% decrement the readers count.
+ 0 -> true = ets:update_element(FileSummaryEts, File,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:delete(GCPid, File),
Pending1 = orddict_store(File, [], Pending),
@@ -1803,17 +1813,17 @@ combine_files(Source, Destination,
dir = Dir,
msg_store = Server }) ->
[#file_summary {
- readers = 0,
- left = Destination,
- valid_total_size = SourceValid,
- file_size = SourceFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Source),
+ readers = 0,
+ left = Destination,
+ valid_total_size = SourceValid,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Source),
[#file_summary {
- readers = 0,
- right = Source,
- valid_total_size = DestinationValid,
- file_size = DestinationFileSize,
- locked = true }] = ets:lookup(FileSummaryEts, Destination),
+ readers = 0,
+ right = Source,
+ valid_total_size = DestinationValid,
+ file_size = DestinationFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, Destination),
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),
@@ -1956,3 +1966,47 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{got, FinalOffsetZ},
{destination, Destination}]}
end.
+
+force_recovery(BaseDir, Store) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ file:delete(filename:join(Dir, ?CLEAN_FILENAME)),
+ recover_crashed_compactions(BaseDir),
+ ok.
+
+foreach_file(D, Fun, Files) ->
+ [Fun(filename:join(D, File)) || File <- Files].
+
+foreach_file(D1, D2, Fun, Files) ->
+ [Fun(filename:join(D1, File), filename:join(D2, File)) || File <- Files].
+
+transform_dir(BaseDir, Store, TransformFun) ->
+ Dir = filename:join(BaseDir, atom_to_list(Store)),
+ TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
+ TransformFile = fun (A, B) -> transform_msg_file(A, B, TransformFun) end,
+ case filelib:is_dir(TmpDir) of
+ true -> throw({error, transform_failed_previously});
+ false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ foreach_file(Dir, TmpDir, TransformFile, FileList),
+ foreach_file(Dir, fun file:delete/1, FileList),
+ foreach_file(TmpDir, Dir, fun file:copy/2, FileList),
+ foreach_file(TmpDir, fun file:delete/1, FileList),
+ ok = file:del_dir(TmpDir)
+ end.
+
+transform_msg_file(FileOld, FileNew, TransformFun) ->
+ rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ {ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
+ {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
+ [{write_buffer,
+ ?HANDLE_CACHE_BUFFER_SIZE}]),
+ {ok, _Acc, _IgnoreSize} =
+ rabbit_msg_file:scan(
+ RefOld, filelib:file_size(FileOld),
+ fun({Guid, _Size, _Offset, BinMsg}, ok) ->
+ {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
+ {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ ok
+ end, ok),
+ file_handle_cache:close(RefOld),
+ file_handle_cache:close(RefNew),
+ ok.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
deleted file mode 100644
index ebd7fe8a06..0000000000
--- a/src/rabbit_multi.erl
+++ /dev/null
@@ -1,349 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(rabbit_multi).
--include("rabbit.hrl").
-
--export([start/0, stop/0]).
-
--define(RPC_SLEEP, 500).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start/0 :: () -> no_return()).
--spec(stop/0 :: () -> 'ok').
--spec(usage/0 :: () -> no_return()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start() ->
- RpcTimeout =
- case init:get_argument(maxwait) of
- {ok,[[N1]]} -> 1000 * list_to_integer(N1);
- _ -> ?MAX_WAIT
- end,
- case init:get_plain_arguments() of
- [] ->
- usage();
- FullCommand ->
- {Command, Args} = parse_args(FullCommand),
- case catch action(Command, Args, RpcTimeout) of
- ok ->
- io:format("done.~n"),
- halt();
- {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join(FullCommand, " ")]),
- usage();
- timeout ->
- print_error("timeout starting some nodes.", []),
- halt(1);
- Other ->
- print_error("~p", [Other]),
- halt(2)
- end
- end.
-
-print_error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
-
-parse_args([Command | Args]) ->
- {list_to_atom(Command), Args}.
-
-stop() ->
- ok.
-
-usage() ->
- io:format("~s", [rabbit_multi_usage:usage()]),
- halt(1).
-
-action(start_all, [NodeCount], RpcTimeout) ->
- io:format("Starting all nodes...~n", []),
- application:load(rabbit),
- {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts(
- getenv("RABBITMQ_NODENAME")),
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- throw({cannot_connect_to_epmd, NodeHost, EpmdReason});
- {ok, _} ->
- ok
- end,
- {NodePids, Running} =
- case list_to_integer(NodeCount) of
- 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName),
- RpcTimeout),
- {[NodePid], Started};
- N -> start_nodes(N, N, [], true, NodeName,
- get_node_tcp_listener(), RpcTimeout)
- end,
- write_pids_file(NodePids),
- case Running of
- true -> ok;
- false -> timeout
- end;
-
-action(status, [], RpcTimeout) ->
- io:format("Status of all running nodes...~n", []),
- call_all_nodes(
- fun ({Node, Pid}) ->
- RabbitRunning =
- case is_rabbit_running(Node, RpcTimeout) of
- false -> not_running;
- true -> running
- end,
- io:format("Node '~p' with Pid ~p: ~p~n",
- [Node, Pid, RabbitRunning])
- end);
-
-action(stop_all, [], RpcTimeout) ->
- io:format("Stopping all nodes...~n", []),
- call_all_nodes(fun ({Node, Pid}) ->
- io:format("Stopping node ~p~n", [Node]),
- rpc:call(Node, rabbit, stop_and_halt, []),
- case kill_wait(Pid, RpcTimeout, false) of
- false -> kill_wait(Pid, RpcTimeout, true);
- true -> ok
- end,
- io:format("OK~n", [])
- end),
- delete_pids_file();
-
-action(rotate_logs, [], RpcTimeout) ->
- action(rotate_logs, [""], RpcTimeout);
-
-action(rotate_logs, [Suffix], RpcTimeout) ->
- io:format("Rotating logs for all nodes...~n", []),
- BinarySuffix = list_to_binary(Suffix),
- call_all_nodes(
- fun ({Node, _}) ->
- io:format("Rotating logs for node ~p", [Node]),
- case rpc:call(Node, rabbit, rotate_logs,
- [BinarySuffix], RpcTimeout) of
- {badrpc, Error} -> io:format(": ~p.~n", [Error]);
- ok -> io:format(": ok.~n", [])
- end
- end).
-
-%% PNodePid is the list of PIDs
-%% Running is a boolean exhibiting success at some moment
-start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running};
-
-start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) ->
- {NodePre, NodeSuff} = NodeNameBase,
- NodeNumber = Total - N,
- NodePre1 = case NodeNumber of
- %% For compatibility with running a single node
- 0 -> NodePre;
- _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber)
- end,
- Node = rabbit_misc:makenode({NodePre1, NodeSuff}),
- os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)),
- case Listener of
- {NodeIpAddress, NodePortBase} ->
- NodePort = NodePortBase + NodeNumber,
- os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)),
- os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress);
- undefined ->
- ok
- end,
- {NodePid, Started} = start_node(Node, RpcTimeout),
- start_nodes(N - 1, Total, [NodePid | PNodePid],
- Started and Running, NodeNameBase, Listener, RpcTimeout).
-
-start_node(Node, RpcTimeout) ->
- io:format("Starting node ~s...~n", [Node]),
- case rpc:call(Node, os, getpid, []) of
- {badrpc, _} ->
- Port = run_rabbitmq_server(),
- Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port),
- Pid = case rpc:call(Node, os, getpid, []) of
- {badrpc, _} -> throw(cannot_get_pid);
- PidS -> list_to_integer(PidS)
- end,
- io:format("~s~n", [case Started of
- true -> "OK";
- false -> "timeout"
- end]),
- {{Node, Pid}, Started};
- PidS ->
- Pid = list_to_integer(PidS),
- throw({node_already_running, Node, Pid})
- end.
-
-wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
- false;
-wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
- case is_rabbit_running(Node, RpcTimeout) of
- true -> true;
- false -> receive
- {'EXIT', Port, PosixCode} ->
- throw({node_start_failed, PosixCode})
- after ?RPC_SLEEP ->
- wait_for_rabbit_to_start(
- Node, RpcTimeout - ?RPC_SLEEP, Port)
- end
- end.
-
-run_rabbitmq_server() ->
- with_os([{unix, fun run_rabbitmq_server_unix/0},
- {win32, fun run_rabbitmq_server_win32/0}]).
-
-run_rabbitmq_server_unix() ->
- CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput",
- erlang:open_port({spawn, CmdLine}, [nouse_stdio]).
-
-run_rabbitmq_server_win32() ->
- Cmd = filename:nativename(os:find_executable("cmd")),
- CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++
- "\\rabbitmq-server.bat\" -noinput -detached",
- erlang:open_port({spawn_executable, Cmd},
- [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]},
- nouse_stdio]).
-
-is_rabbit_running(Node, RpcTimeout) ->
- case rpc:call(Node, rabbit, status, [], RpcTimeout) of
- {badrpc, _} -> false;
- Status -> case proplists:get_value(running_applications, Status) of
- undefined -> false;
- Apps -> lists:keymember(rabbit, 1, Apps)
- end
- end.
-
-with_os(Handlers) ->
- {OsFamily, _} = os:type(),
- case proplists:get_value(OsFamily, Handlers) of
- undefined -> throw({unsupported_os, OsFamily});
- Handler -> Handler()
- end.
-
-pids_file() -> getenv("RABBITMQ_PIDS_FILE").
-
-write_pids_file(Pids) ->
- FileName = pids_file(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} ->
- Device;
- {error, Reason} ->
- throw({cannot_create_pids_file, FileName, Reason})
- end,
- try
- ok = io:write(Handle, Pids),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({cannot_create_pids_file, FileName, Reason1})
- end
- end,
- ok.
-
-delete_pids_file() ->
- FileName = pids_file(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason})
- end.
-
-read_pids_file() ->
- FileName = pids_file(),
- case file:consult(FileName) of
- {ok, [Pids]} -> Pids;
- {error, enoent} -> [];
- {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason})
- end.
-
-kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 ->
- Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9";
- true -> "kill"
- end
- end},
- %% Kill forcefully always on Windows, since erl.exe
- %% seems to completely ignore non-forceful killing
- %% even when everything is working
- {win32, fun () -> "taskkill /f /pid" end}]),
- os:cmd(Cmd ++ " " ++ integer_to_list(Pid)),
- false; % Don't assume what we did just worked!
-
-% Returns true if the process is dead, false otherwise.
-kill_wait(Pid, TimeLeft, Forceful) ->
- timer:sleep(?RPC_SLEEP),
- io:format(".", []),
- is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful).
-
-% Test using some OS clunkiness since we shouldn't trust
-% rpc:call(os, getpid, []) at this point
-is_dead(Pid) ->
- PidS = integer_to_list(Pid),
- with_os([{unix, fun () ->
- system("kill -0 " ++ PidS
- ++ " >/dev/null 2>&1") /= 0
- end},
- {win32, fun () ->
- Res = os:cmd("tasklist /nh /fi \"pid eq " ++
- PidS ++ "\" 2>&1"),
- case re:run(Res, "erl\\.exe", [{capture, none}]) of
- match -> false;
- _ -> true
- end
- end}]).
-
-% Like system(3)
-system(Cmd) ->
- ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'",
- Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]),
- receive {Port, {exit_status, Status}} -> Status end.
-
-% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'"
-escape_quotes(Cmd) ->
- lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)).
-
-call_all_nodes(Func) ->
- case read_pids_file() of
- [] -> throw(no_nodes_running);
- NodePids -> lists:foreach(Func, NodePids)
- end.
-
-getenv(Var) ->
- case os:getenv(Var) of
- false -> throw({missing_env_var, Var});
- Value -> Value
- end.
-
-get_node_tcp_listener() ->
- try
- {getenv("RABBITMQ_NODE_IP_ADDRESS"),
- list_to_integer(getenv("RABBITMQ_NODE_PORT"))}
- catch _ ->
- case application:get_env(rabbit, tcp_listeners) of
- {ok, [{_IpAddy, _Port} = Listener]} ->
- Listener;
- {ok, [Port]} when is_number(Port) ->
- {"0.0.0.0", Port};
- {ok, []} ->
- undefined;
- {ok, Other} ->
- throw({cannot_start_multiple_nodes, multiple_tcp_listeners,
- Other});
- undefined ->
- throw({missing_configuration, tcp_listeners})
- end
- end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 36f61628b8..877d2cf7da 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -67,7 +67,7 @@
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+ -> [{inet:ip_address(), ip_port(), family(), atom()}]).
-endif.
@@ -90,15 +90,15 @@ boot_ssl() ->
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(ssl_options),
- % unknown_ca errors are silently ignored prior to R14B unless we
- % supply this verify_fun - remove when at least R14B is required
+ %% unknown_ca errors are silently ignored prior to R14B unless we
+ %% supply this verify_fun - remove when at least R14B is required
SslOpts =
case proplists:get_value(verify, SslOptsConfig, verify_none) of
verify_none -> SslOptsConfig;
verify_peer -> [{verify_fun, fun([]) -> true;
([_|_]) -> false
end}
- | SslOptsConfig]
+ | SslOptsConfig]
end,
[start_ssl_listener(Listener, SslOpts) || Listener <- SslListeners],
ok
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 817abaa2bd..1f30a2fc35 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -69,6 +69,7 @@ handle_call(_Request, _From, State) ->
handle_cast({rabbit_running_on, Node}, State) ->
rabbit_log:info("node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
+ ok = rabbit_alarm:on_node_up(Node),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -76,7 +77,7 @@ handle_cast(_Msg, State) ->
handle_info({nodedown, Node}, State) ->
rabbit_log:info("node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, State};
handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
ok = handle_dead_rabbit(Node),
@@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-%% TODO: This may turn out to be a performance hog when there are
-%% lots of nodes. We really only need to execute this code on
-%% *one* node, rather than all of them.
+%% TODO: This may turn out to be a performance hog when there are lots
+%% of nodes. We really only need to execute some of these statements
+%% on *one* node, rather than all of them.
handle_dead_rabbit(Node) ->
ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_amqqueue:on_node_down(Node).
-
+ ok = rabbit_amqqueue:on_node_down(Node),
+ ok = rabbit_alarm:on_node_down(Node).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d9d92788e1..7bb8c0ea3b 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -250,13 +250,13 @@ duplicate_node_check(NodeStr) ->
case net_adm:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
- true -> io:format("node with name ~p "
- "already running on ~p~n",
- [NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
- terminate(?ERROR_CODE);
- false -> ok
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
end;
{error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
[EpmdReason])
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76b1136f8b..00f5a75219 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -145,8 +145,8 @@
%% 1 publish, 1 deliver, 1 ack per msg
-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUBLISH_RECORD_LENGTH_BYTES +
- (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
+ (?PUBLISH_RECORD_LENGTH_BYTES +
+ (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))).
%% ---- misc ----
@@ -177,7 +177,7 @@
path :: file:filename(),
journal_entries :: array(),
unacked :: non_neg_integer()
- })).
+ })).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
@@ -188,10 +188,10 @@
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
unsynced_guids :: [rabbit_guid:guid()]
- }).
+ }).
-type(startup_fun_state() ::
{fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
- A}).
+ A}).
-type(shutdown_terms() :: [any()]).
-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
@@ -272,7 +272,7 @@ publish(Guid, SeqId, MsgProps, IsPersistent,
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
SeqId:?SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)]),
+ create_pub_record_body(Guid, MsgProps)]),
maybe_flush_journal(
add_to_journal(SeqId, {Guid, MsgProps, IsPersistent}, State1)).
@@ -666,8 +666,8 @@ recover_journal(State) ->
journal_minus_segment(JEntries, SegEntries),
Segment #segment { journal_entries = JEntries1,
unacked = (UnackedCountInJournal +
- UnackedCountInSeg -
- UnackedCountDuplicates) }
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
end, Segments),
State1 #qistate { segments = Segments1 }.
@@ -799,16 +799,16 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
{Guid, MsgProps, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(Guid, MsgProps)])
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>,
+ create_pub_record_body(Guid, MsgProps)])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -853,14 +853,14 @@ load_segment(KeepAcked, #segment { path = Path }) ->
load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
{Guid, MsgProps} = read_pub_record_body(Hdl),
Obj = {{Guid, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
load_segment_entries(KeepAcked, Hdl, SegEntries1,
UnackedCount + 1);
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{UnackedCountDelta, SegEntries1} =
case array:get(RelSeq, SegEntries) of
{Pub, no_del, no_ack} ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3908b64692..710e687809 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,7 +37,7 @@
-define(SILENT_CLOSE_DELAY, 3).
-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
-%---------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer,
@@ -62,7 +62,7 @@
State#v1.connection_state =:= blocking orelse
State#v1.connection_state =:= blocked)).
-%%----------------------------------------------------------------------------
+%%--------------------------------------------------------------------------
-ifdef(use_specs).
@@ -158,7 +158,7 @@ server_properties(Protocol) ->
{copyright, ?COPYRIGHT_MESSAGE},
{information, ?INFORMATION_MESSAGE}]]],
- %% Filter duplicated properties in favor of config file provided values
+ %% Filter duplicated properties in favour of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
@@ -564,7 +564,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(Protocol),
- mechanisms = auth_mechanisms_binary(),
+ mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
@@ -592,14 +592,14 @@ handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
HandleException =
fun(R) ->
- case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, R);
- %% We don't trust the client at this point - force
- %% them to wait for a bit so they can't DOS us with
- %% repeated failed logins etc.
- false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state, R})
- end
+ case ?IS_RUNNING(State) of
+ true -> send_exception(State, 0, R);
+ %% We don't trust the client at this point - force
+ %% them to wait for a bit so they can't DOS us with
+ %% repeated failed logins etc.
+ false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
+ throw({channel0_error, State#v1.connection_state, R})
+ end
end,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
@@ -616,7 +616,7 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
State0 = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
- AuthMechanism = auth_mechanism_to_module(Mechanism),
+ AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
@@ -709,14 +709,14 @@ handle_method0(_Method, #v1{connection_state = S}) ->
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
-auth_mechanism_to_module(TypeBin) ->
+auth_mechanism_to_module(TypeBin, Sock) ->
case rabbit_registry:binary_to_type(TypeBin) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown authentication mechanism '~s'",
[TypeBin]);
T ->
- case {lists:member(T, auth_mechanisms()),
+ case {lists:member(T, auth_mechanisms(Sock)),
rabbit_registry:lookup_module(auth_mechanism, T)} of
{true, {ok, Module}} ->
Module;
@@ -727,15 +727,14 @@ auth_mechanism_to_module(TypeBin) ->
end
end.
-auth_mechanisms() ->
+auth_mechanisms(Sock) ->
{ok, Configured} = application:get_env(auth_mechanisms),
- [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism),
- lists:member(Name, Configured)].
+ [Name || {Name, Module} <- rabbit_registry:lookup_all(auth_mechanism),
+ Module:should_offer(Sock), lists:member(Name, Configured)].
-auth_mechanisms_binary() ->
+auth_mechanisms_binary(Sock) ->
list_to_binary(
- string:join(
- [atom_to_list(A) || A <- auth_mechanisms()], " ")).
+ string:join([atom_to_list(A) || A <- auth_mechanisms(Sock)], " ")).
auth_phase(Response,
State = #v1{auth_mechanism = AuthMechanism,
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 795413aa5c..9821ae7b86 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -48,7 +48,7 @@ start_link() ->
%%---------------------------------------------------------------------------
register(Class, TypeName, ModuleName) ->
- gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}).
+ gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity).
%% This is used with user-supplied arguments (e.g., on exchange
%% declare), so we restrict it to existing atoms only. This means it
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 692d2473b8..f6a1c92fcc 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -37,7 +37,8 @@
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
-spec(match_routing_key/2 :: (rabbit_types:binding_source(),
- routing_key() | '_') -> match_result()).
+ [routing_key()] | ['_']) ->
+ match_result()).
-endif.
@@ -58,7 +59,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
{routed, QPids};
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
+ immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
@@ -66,7 +67,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
@@ -82,12 +83,22 @@ match_bindings(SrcName, Match) ->
Match(Binding)]),
mnesia:async_dirty(fun qlc:e/1, [Query]).
-match_routing_key(SrcName, RoutingKey) ->
+match_routing_key(SrcName, [RoutingKey]) ->
MatchHead = #route{binding = #binding{source = SrcName,
destination = '$1',
key = RoutingKey,
_ = '_'}},
- mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]).
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}]);
+match_routing_key(SrcName, [_|_] = RoutingKeys) ->
+ Condition = list_to_tuple(['orelse' | [{'=:=', '$2', RKey} ||
+ RKey <- RoutingKeys]]),
+ MatchHead = #route{binding = #binding{source = SrcName,
+ destination = '$1',
+ key = '$2',
+ _ = '_'}},
+ mnesia:dirty_select(rabbit_route, [{MatchHead, [Condition], ['$1']}]).
+
+
%%--------------------------------------------------------------------
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e831ee5141..1953b6b85c 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -87,8 +87,8 @@ cert_info(F, Cert) ->
find_by_type(Type, {rdnSequence, RDNs}) ->
case [V || #'AttributeTypeAndValue'{type = T, value = V}
- <- lists:flatten(RDNs),
- T == Type] of
+ <- lists:flatten(RDNs),
+ T == Type] of
[{printableString, S}] -> S;
[] -> not_found
end.
@@ -166,7 +166,7 @@ format_asn1_value({ST, S}) when ST =:= teletexString; ST =:= printableString;
true -> S
end;
format_asn1_value({utcTime, [Y1, Y2, M1, M2, D1, D2, H1, H2,
- Min1, Min2, S1, S2, $Z]}) ->
+ Min1, Min2, S1, S2, $Z]}) ->
io_lib:format("20~c~c-~c~c-~c~cT~c~c:~c~c:~c~cZ",
[Y1, Y2, M1, M2, D1, D2, H1, H2, Min1, Min2, S1, S2]);
format_asn1_value(V) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 4e526f5d4f..d7587fe04b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,6 +35,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = gm_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
passed = test_file_handle_cache(),
@@ -424,35 +425,35 @@ test_content_properties() ->
[{<<"one">>, signedint, 1},
{<<"two">>, signedint, 2}]}]}],
<<
- % property-flags
- 16#8000:16,
+ %% property-flags
+ 16#8000:16,
- % property-list:
+ %% property-list:
- % table
- 117:32, % table length in bytes
+ %% table
+ 117:32, % table length in bytes
- 11,"a signedint", % name
- "I",12345678:32, % type and value
+ 11,"a signedint", % name
+ "I",12345678:32, % type and value
- 9,"a longstr",
- "S",10:32,"yes please",
+ 9,"a longstr",
+ "S",10:32,"yes please",
- 9,"a decimal",
- "D",123,12345678:32,
+ 9,"a decimal",
+ "D",123,12345678:32,
- 11,"a timestamp",
- "T", 123456789012345:64,
+ 11,"a timestamp",
+ "T", 123456789012345:64,
- 14,"a nested table",
- "F",
- 18:32,
+ 14,"a nested table",
+ "F",
+ 18:32,
- 3,"one",
- "I",1:32,
+ 3,"one",
+ "I",1:32,
- 3,"two",
- "I",2:32 >>),
+ 3,"two",
+ "I",2:32 >>),
case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of
{'EXIT', content_properties_binary_overflow} -> passed;
V -> exit({got_success_but_expected_failure, V})
@@ -479,28 +480,28 @@ test_field_values() ->
]}],
<<
- % property-flags
- 16#8000:16,
- % table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string",% + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ %% property-flags
+ 16#8000:16,
+ %% table length in bytes
+ 228:32,
+
+ 7,"longstr", "S", 21:32, "Here is a long string", % = 34
+ 9,"signedint", "I", 12345:32/signed, % + 15 = 49
+ 7,"decimal", "D", 3, 123456:32, % + 14 = 63
+ 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
+ 5,"table", "F", 31:32, % length of table % + 11 = 93
+ 3,"one", "I", 54321:32, % + 9 = 102
+ 3,"two", "S", 13:32, "A long string", % + 22 = 124
+ 4,"byte", "b", 255:8, % + 7 = 131
+ 4,"long", "l", 1234567890:64, % + 14 = 145
+ 5,"short", "s", 655:16, % + 9 = 154
+ 4,"bool", "t", 1, % + 7 = 161
+ 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
+ 4,"void", "V", % + 6 = 194
+ 5,"array", "A", 23:32, % + 11 = 205
+ "I", 54321:32, % + 5 = 210
+ "S", 13:32, "A long string" % + 18 = 228
+ >>),
passed.
%% Test that content frames don't exceed frame-max
@@ -585,32 +586,131 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
-test_topic_match(P, R) ->
- test_topic_match(P, R, true).
-
-test_topic_match(P, R, Expected) ->
- case rabbit_exchange_type_topic:topic_matches(list_to_binary(P),
- list_to_binary(R)) of
- Expected ->
- passed;
- _ ->
- {topic_match_failure, P, R}
- end.
-
test_topic_matching() ->
- passed = test_topic_match("#", "test.test"),
- passed = test_topic_match("#", ""),
- passed = test_topic_match("#.T.R", "T.T.R"),
- passed = test_topic_match("#.T.R", "T.R.T.R"),
- passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"),
- passed = test_topic_match("#.test", "test"),
- passed = test_topic_match("#.test", "test.test"),
- passed = test_topic_match("#.test", "ignored.test"),
- passed = test_topic_match("#.test", "more.ignored.test"),
- passed = test_topic_match("#.test", "notmatched", false),
- passed = test_topic_match("#.z", "one.two.three.four", false),
+ XName = #resource{virtual_host = <<"/">>,
+ kind = exchange,
+ name = <<"test_exchange">>},
+ X = #exchange{name = XName, type = topic, durable = false,
+ auto_delete = false, arguments = []},
+ %% create
+ rabbit_exchange_type_topic:validate(X),
+ exchange_op_callback(X, create, []),
+
+ %% add some bindings
+ Bindings = lists:map(
+ fun ({Key, Q}) ->
+ #binding{source = XName,
+ key = list_to_binary(Key),
+ destination = #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}}
+ end, [{"a.b.c", "t1"},
+ {"a.*.c", "t2"},
+ {"a.#.b", "t3"},
+ {"a.b.b.c", "t4"},
+ {"#", "t5"},
+ {"#.#", "t6"},
+ {"#.b", "t7"},
+ {"*.*", "t8"},
+ {"a.*", "t9"},
+ {"*.b.c", "t10"},
+ {"a.#", "t11"},
+ {"a.#.#", "t12"},
+ {"b.b.c", "t13"},
+ {"a.b.b", "t14"},
+ {"a.b", "t15"},
+ {"b.c", "t16"},
+ {"", "t17"},
+ {"*.*.*", "t18"},
+ {"vodka.martini", "t19"},
+ {"a.b.c", "t20"},
+ {"*.#", "t21"},
+ {"#.*.#", "t22"},
+ {"*.#.#", "t23"},
+ {"#.#.#", "t24"},
+ {"*", "t25"},
+ {"#.b.#", "t26"}]),
+ lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end,
+ Bindings),
+
+ %% test some matches
+ test_topic_expect_match(
+ X, [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12",
+ "t18", "t20", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11",
+ "t12", "t15", "t21", "t22", "t23", "t24",
+ "t26"]},
+ {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14",
+ "t18", "t21", "t22", "t23", "t24", "t26"]},
+ {"", ["t5", "t6", "t17", "t24"]},
+ {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23",
+ "t24", "t26"]},
+ {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22",
+ "t23", "t24"]},
+ {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23",
+ "t24"]},
+ {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23",
+ "t24"]},
+ {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21",
+ "t22", "t23", "t24", "t26"]},
+ {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]},
+ {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24",
+ "t25"]}]),
+
+ %% remove some bindings
+ RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings),
+ lists:nth(11, Bindings), lists:nth(19, Bindings),
+ lists:nth(21, Bindings)],
+ exchange_op_callback(X, remove_bindings, [RemovedBindings]),
+ RemainingBindings = ordsets:to_list(
+ ordsets:subtract(ordsets:from_list(Bindings),
+ ordsets:from_list(RemovedBindings))),
+
+ %% test some matches
+ test_topic_expect_match(X,
+ [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22",
+ "t23", "t24", "t26"]},
+ {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15",
+ "t22", "t23", "t24", "t26"]},
+ {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22",
+ "t23", "t24", "t26"]},
+ {"", ["t6", "t17", "t24"]},
+ {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]},
+ {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]},
+ {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]},
+ {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]},
+ {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23",
+ "t24", "t26"]},
+ {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]},
+ {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]),
+
+ %% remove the entire exchange
+ exchange_op_callback(X, delete, [RemainingBindings]),
+ %% none should match now
+ test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]),
passed.
+exchange_op_callback(X, Fun, ExtraArgs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end),
+ rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs).
+
+test_topic_expect_match(X, List) ->
+ lists:foreach(
+ fun ({Key, Expected}) ->
+ BinKey = list_to_binary(Key),
+ Res = rabbit_exchange_type_topic:route(
+ X, #delivery{message = #basic_message{routing_keys =
+ [BinKey]}}),
+ ExpectedRes = lists:map(
+ fun (Q) -> #resource{virtual_host = <<"/">>,
+ kind = queue,
+ name = list_to_binary(Q)}
+ end, Expected),
+ true = (lists:usort(ExpectedRes) =:= lists:usort(Res))
+ end, List).
+
test_app_management() ->
%% starting, stopping, status
ok = control_action(stop_app, []),
@@ -718,7 +818,7 @@ test_log_management_during_startup() ->
ok = delete_log_handlers([sasl_report_tty_h]),
ok = case catch control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_tty_no_handlers_test});
+ log_rotation_tty_no_handlers_test});
{error, {cannot_log_to_tty, _, _}} -> ok
end,
@@ -743,8 +843,8 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotation_no_write_permission_dir_test});
- {error, {cannot_log_to_file, _, _}} -> ok
+ log_rotation_no_write_permission_dir_test});
+ {error, {cannot_log_to_file, _, _}} -> ok
end,
%% start application with logging to a subdirectory which
@@ -754,9 +854,9 @@ test_log_management_during_startup() ->
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
- log_rotatation_parent_dirs_test});
+ log_rotatation_parent_dirs_test});
{error, {cannot_log_to_file, _,
- {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
+ {error, {cannot_create_parent_dirs, _, eacces}}}} -> ok
end,
ok = set_permissions(TmpDir, 8#00700),
ok = set_permissions(TmpLog, 8#00600),
@@ -776,22 +876,22 @@ test_log_management_during_startup() ->
passed.
test_option_parser() ->
- % command and arguments should just pass through
+ %% command and arguments should just pass through
ok = check_get_options({["mock_command", "arg1", "arg2"], []},
[], ["mock_command", "arg1", "arg2"]),
- % get flags
+ %% get flags
ok = check_get_options(
{["mock_command", "arg1"], [{"-f", true}, {"-f2", false}]},
[{flag, "-f"}, {flag, "-f2"}], ["mock_command", "arg1", "-f"]),
- % get options
+ %% get options
ok = check_get_options(
{["mock_command"], [{"-foo", "bar"}, {"-baz", "notbaz"}]},
[{option, "-foo", "notfoo"}, {option, "-baz", "notbaz"}],
["mock_command", "-foo", "bar"]),
- % shuffled and interleaved arguments and options
+ %% shuffled and interleaved arguments and options
ok = check_get_options(
{["a1", "a2", "a3"], [{"-o1", "hello"}, {"-o2", "noto2"}, {"-f", true}]},
[{option, "-o1", "noto1"}, {flag, "-f"}, {option, "-o2", "noto2"}],
@@ -1043,7 +1143,7 @@ test_server_status() ->
[_|_] = rabbit_binding:list_for_source(
rabbit_misc:r(<<"/">>, exchange, <<"">>)),
[_] = rabbit_binding:list_for_destination(
- rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
+ rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
[_] = rabbit_binding:list_for_source_and_destination(
rabbit_misc:r(<<"/">>, exchange, <<"">>),
rabbit_misc:r(<<"/">>, queue, <<"foo">>)),
@@ -1205,9 +1305,9 @@ test_delegates_async(SecondaryNode) ->
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
fun () ->
- receive Msg -> FMsg(Msg)
- after 1000 -> throw(Throw)
- end
+ receive Msg -> FMsg(Msg)
+ after 1000 -> throw(Throw)
+ end
end.
spawn_responders(Node, Responder, Count) ->
@@ -1218,10 +1318,10 @@ await_response(0) ->
await_response(Count) ->
receive
response -> ok,
- await_response(Count - 1)
+ await_response(Count - 1)
after 1000 ->
- io:format("Async reply not received~n"),
- throw(timeout)
+ io:format("Async reply not received~n"),
+ throw(timeout)
end.
must_exit(Fun) ->
@@ -1233,11 +1333,11 @@ must_exit(Fun) ->
end.
test_delegates_sync(SecondaryNode) ->
- Sender = fun (Pid) -> gen_server:call(Pid, invoked) end,
+ Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end,
BadSender = fun (_Pid) -> exit(exception) end,
Responder = make_responder(fun ({'$gen_call', From, invoked}) ->
- gen_server:reply(From, response)
+ gen_server:reply(From, response)
end),
BadResponder = make_responder(fun ({'$gen_call', From, invoked}) ->
@@ -1249,7 +1349,7 @@ test_delegates_sync(SecondaryNode) ->
must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end),
must_exit(fun () ->
- delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
+ delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end),
LocalGoodPids = spawn_responders(node(), Responder, 2),
RemoteGoodPids = spawn_responders(SecondaryNode, Responder, 2),
@@ -1338,7 +1438,7 @@ test_declare_on_dead_queue(SecondaryNode) ->
throw(failed_to_create_and_kill_queue)
end.
-%---------------------------------------------------------------------
+%%---------------------------------------------------------------------
control_action(Command, Args) ->
control_action(Command, node(), Args, default_options()).
@@ -1853,7 +1953,7 @@ test_queue_index() ->
with_empty_test_queue(
fun (Qi0) ->
{Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD,
- false, Qi0),
+ false, Qi0),
Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1),
Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2),
rabbit_queue_index:flush(Qi3)
@@ -2095,7 +2195,7 @@ check_variable_queue_status(VQ0, Props) ->
variable_queue_wait_for_shuffling_end(VQ) ->
case rabbit_variable_queue:needs_idle_timeout(VQ) of
true -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:idle_timeout(VQ));
+ rabbit_variable_queue:idle_timeout(VQ));
false -> VQ
end.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 3dbe740f27..a11595e55d 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -42,39 +42,39 @@
%% TODO: make this more precise by tying specific class_ids to
%% specific properties
-type(undecoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: 'none',
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]} |
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: 'none',
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
-type(unencoded_content() :: undecoded_content()).
-type(decoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: rabbit_framing:amqp_property_record(),
- properties_bin :: maybe(binary()),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
-type(encoded_content() ::
- #content{class_id :: rabbit_framing:amqp_class_id(),
- properties :: maybe(rabbit_framing:amqp_property_record()),
- properties_bin :: binary(),
- payload_fragments_rev :: [binary()]}).
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: maybe(rabbit_framing:amqp_property_record()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
-type(content() :: undecoded_content() | decoded_content()).
-type(basic_message() ::
- #basic_message{exchange_name :: rabbit_exchange:name(),
- routing_key :: rabbit_router:routing_key(),
- content :: content(),
- guid :: rabbit_guid:guid(),
- is_persistent :: boolean()}).
+ #basic_message{exchange_name :: rabbit_exchange:name(),
+ routing_keys :: [rabbit_router:routing_key()],
+ content :: content(),
+ guid :: rabbit_guid:guid(),
+ is_persistent :: boolean()}).
-type(message() :: basic_message()).
-type(delivery() ::
- #delivery{mandatory :: boolean(),
- immediate :: boolean(),
- txn :: maybe(txn()),
- sender :: pid(),
- message :: message()}).
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
-type(message_properties() ::
#message_properties{expiry :: pos_integer() | 'undefined',
needs_confirming :: boolean()}).
@@ -89,9 +89,9 @@
-type(infos() :: [info()]).
-type(amqp_error() ::
- #amqp_error{name :: rabbit_framing:amqp_exception(),
- explanation :: string(),
- method :: rabbit_framing:amqp_method_name()}).
+ #amqp_error{name :: rabbit_framing:amqp_exception(),
+ explanation :: string(),
+ method :: rabbit_framing:amqp_method_name()}).
-type(r(Kind) ::
r2(vhost(), Kind)).
@@ -103,34 +103,34 @@
name :: Name}).
-type(listener() ::
- #listener{node :: node(),
- protocol :: atom(),
- host :: rabbit_networking:hostname(),
- port :: rabbit_networking:ip_port()}).
+ #listener{node :: node(),
+ protocol :: atom(),
+ host :: rabbit_networking:hostname(),
+ port :: rabbit_networking:ip_port()}).
-type(binding_source() :: rabbit_exchange:name()).
-type(binding_destination() :: rabbit_amqqueue:name() | rabbit_exchange:name()).
-type(binding() ::
- #binding{source :: rabbit_exchange:name(),
- destination :: binding_destination(),
- key :: rabbit_binding:key(),
- args :: rabbit_framing:amqp_table()}).
+ #binding{source :: rabbit_exchange:name(),
+ destination :: binding_destination(),
+ key :: rabbit_binding:key(),
+ args :: rabbit_framing:amqp_table()}).
-type(amqqueue() ::
- #amqqueue{name :: rabbit_amqqueue:name(),
- durable :: boolean(),
- auto_delete :: boolean(),
- exclusive_owner :: rabbit_types:maybe(pid()),
- arguments :: rabbit_framing:amqp_table(),
- pid :: rabbit_types:maybe(pid())}).
+ #amqqueue{name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: rabbit_types:maybe(pid()),
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: rabbit_types:maybe(pid())}).
-type(exchange() ::
- #exchange{name :: rabbit_exchange:name(),
- type :: rabbit_exchange:type(),
- durable :: boolean(),
- auto_delete :: boolean(),
- arguments :: rabbit_framing:amqp_table()}).
+ #exchange{name :: rabbit_exchange:name(),
+ type :: rabbit_exchange:type(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ arguments :: rabbit_framing:amqp_table()}).
-type(connection() :: pid()).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index b0a715233a..ebda5d03a7 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -98,7 +98,6 @@ vertices(Module, Steps) ->
edges(_Module, Steps) ->
[{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires].
-
unknown_heads(Heads, G) ->
[H || H <- Heads, digraph:vertex(G, H) =:= false].
@@ -107,9 +106,9 @@ upgrades_to_apply(Heads, G) ->
%% everything we've already applied. Subtract that from all
%% vertices: that's what we have to apply.
Unsorted = sets:to_list(
- sets:subtract(
- sets:from_list(digraph:vertices(G)),
- sets:from_list(digraph_utils:reaching(Heads, G)))),
+ sets:subtract(
+ sets:from_list(digraph:vertices(G)),
+ sets:from_list(digraph_utils:reaching(Heads, G)))),
%% Form a subgraph from that list and find a topological ordering
%% so we can invoke them in order.
[element(2, digraph:vertex(G, StepName)) ||
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 68b88b3e45..b9dbe418d9 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -25,6 +25,7 @@
-rabbit_upgrade({add_ip_to_listener, []}).
-rabbit_upgrade({internal_exchanges, []}).
-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
+-rabbit_upgrade({topic_trie, []}).
%% -------------------------------------------------------------------
@@ -35,6 +36,7 @@
-spec(add_ip_to_listener/0 :: () -> 'ok').
-spec(internal_exchanges/0 :: () -> 'ok').
-spec(user_to_internal_user/0 :: () -> 'ok').
+-spec(topic_trie/0 :: () -> 'ok').
-endif.
@@ -47,7 +49,7 @@
%% point.
remove_user_scope() ->
- mnesia(
+ transform(
rabbit_user_permission,
fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) ->
{user_permission, UV, {permission, Conf, Write, Read}}
@@ -55,7 +57,7 @@ remove_user_scope() ->
[user_vhost, permission]).
hash_passwords() ->
- mnesia(
+ transform(
rabbit_user,
fun ({user, Username, Password, IsAdmin}) ->
Hash = rabbit_auth_backend_internal:hash_password(Password),
@@ -64,7 +66,7 @@ hash_passwords() ->
[username, password_hash, is_admin]).
add_ip_to_listener() ->
- mnesia(
+ transform(
rabbit_listener,
fun ({listener, Node, Protocol, Host, Port}) ->
{listener, Node, Protocol, Host, {0,0,0,0}, Port}
@@ -77,27 +79,41 @@ internal_exchanges() ->
fun ({exchange, Name, Type, Durable, AutoDelete, Args}) ->
{exchange, Name, Type, Durable, AutoDelete, false, Args}
end,
- [ ok = mnesia(T,
- AddInternalFun,
- [name, type, durable, auto_delete, internal, arguments])
+ [ ok = transform(T,
+ AddInternalFun,
+ [name, type, durable, auto_delete, internal, arguments])
|| T <- Tables ],
ok.
user_to_internal_user() ->
- mnesia(
+ transform(
rabbit_user,
fun({user, Username, PasswordHash, IsAdmin}) ->
{internal_user, Username, PasswordHash, IsAdmin}
end,
[username, password_hash, is_admin], internal_user).
+topic_trie() ->
+ create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge},
+ {attributes, [trie_edge, node_id]},
+ {type, ordered_set}]),
+ create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding},
+ {attributes, [trie_binding, value]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
-mnesia(TableName, Fun, FieldList) ->
+transform(TableName, Fun, FieldList) ->
+ rabbit_mnesia:wait_for_tables([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList),
ok.
-mnesia(TableName, Fun, FieldList, NewRecordName) ->
+transform(TableName, Fun, FieldList, NewRecordName) ->
+ rabbit_mnesia:wait_for_tables([TableName]),
{atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList,
NewRecordName),
ok.
+
+create(Tab, TabDef) ->
+ {atomic, ok} = mnesia:create_table(Tab, TabDef),
+ ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7142d56072..591e5a6611 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1]).
+ status/1, multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -268,13 +268,13 @@
msg_on_disk,
index_on_disk,
msg_props
- }).
+ }).
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
end_seq_id %% end_seq_id is exclusive
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -294,6 +294,8 @@
%%----------------------------------------------------------------------------
+-rabbit_upgrade({multiple_routing_keys, []}).
+
-ifdef(use_specs).
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
@@ -351,6 +353,8 @@
-include("rabbit_backing_queue_spec.hrl").
+-spec(multiple_routing_keys/0 :: () -> 'ok').
+
-endif.
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
@@ -506,8 +510,12 @@ publish(Msg, MsgProps, State) ->
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { guid = Guid },
- _MsgProps, State = #vqstate { len = 0 }) ->
- blind_confirm(self(), gb_sets:singleton(Guid)),
+ #message_properties { needs_confirming = NeedsConfirming },
+ State = #vqstate { len = 0 }) ->
+ case NeedsConfirming of
+ true -> blind_confirm(self(), gb_sets:singleton(Guid));
+ false -> ok
+ end,
{undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -536,7 +544,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
- State1.
+ a(State1).
dropwhile1(Pred, State) ->
internal_queue_out(
@@ -623,12 +631,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -768,8 +776,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -1384,7 +1392,7 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
{PersistentSeqIdsAcc, GuidsByStore}) ->
@@ -1447,8 +1455,8 @@ msgs_written_to_disk(QPid, GuidSet, written) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, GuidSet)) })
end).
msg_indices_written_to_disk(QPid, GuidSet) ->
@@ -1459,8 +1467,8 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, GuidSet)) })
end).
%%----------------------------------------------------------------------------
@@ -1801,3 +1809,27 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index efebef0699..24c130edd6 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -48,15 +48,15 @@ add(VHostPath) ->
ok;
(ok, false) ->
[rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index eba86a5551..ac3434d253 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -28,7 +28,7 @@
-define(HIBERNATE_AFTER, 5000).
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
-ifdef(use_specs).
@@ -69,7 +69,7 @@
-endif.
-%%----------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
{ok,
@@ -133,7 +133,7 @@ handle_message({inet_reply, _, Status}, _State) ->
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
send_command(W, MethodRecord) ->
W ! {send_command, MethodRecord},
@@ -157,13 +157,13 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
call(Pid, Msg) ->
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
Res.
-%---------------------------------------------------------------------------
+%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 44e1e4b5ae..dcc6aff5c8 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit,
case {Alarmed, NewAlarmed} of
{false, true} ->
emit_update_info(set, MemUsed, MemLimit),
- alarm_handler:set_alarm({vm_memory_high_watermark, []});
+ alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
{true, false} ->
emit_update_info(clear, MemUsed, MemLimit),
- alarm_handler:clear_alarm(vm_memory_high_watermark);
+ alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
_ ->
ok
end,