summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-16 12:24:08 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-16 12:24:08 +0100
commitbf30a77325d614eab5ea1964c8d2536c7f7c947f (patch)
treea0395ef3d301f6bc2bf9101698ec22d241afc18e
parent62dc3ebde75180415710d216b3bd16680b6bae2b (diff)
parent811f35ceeac31ab66e8c57bb0962b97c2209896c (diff)
downloadrabbitmq-server-git-bf30a77325d614eab5ea1964c8d2536c7f7c947f.tar.gz
merge default into bug25202
...and in the process inline mq_slave:forget_slave and fix a bug in it (it was looking at the wrong element of the tuple)
-rw-r--r--src/gatherer.erl51
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl84
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl30
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl98
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_tests.erl45
-rw-r--r--src/rabbit_vm.erl129
12 files changed, 309 insertions, 162 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 98b360389a..29d2d71366 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -32,6 +32,7 @@
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(sync_in/2 :: (pid(), any()) -> 'ok').
-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -62,6 +63,9 @@ finish(Pid) ->
in(Pid, Value) ->
gen_server2:cast(Pid, {in, Value}).
+sync_in(Pid, Value) ->
+ gen_server2:call(Pid, {in, Value}, infinity).
+
out(Pid) ->
gen_server2:call(Pid, out, infinity).
@@ -78,19 +82,22 @@ handle_call(stop, _From, State) ->
handle_call(fork, _From, State = #gstate { forks = Forks }) ->
{reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
+handle_call({in, Value}, From, State) ->
+ {noreply, in(Value, From, State), hibernate};
+
handle_call(out, From, State = #gstate { forks = Forks,
values = Values,
blocked = Blocked }) ->
case queue:out(Values) of
+ {empty, _} when Forks == 0 ->
+ {reply, empty, State, hibernate};
{empty, _} ->
- case Forks of
- 0 -> {reply, empty, State, hibernate};
- _ -> {noreply,
- State #gstate { blocked = queue:in(From, Blocked) },
- hibernate}
- end;
- {{value, _Value} = V, NewValues} ->
- {reply, V, State #gstate { values = NewValues }, hibernate}
+ {noreply, State #gstate { blocked = queue:in(From, Blocked) },
+ hibernate};
+ {{value, {PendingIn, Value}}, NewValues} ->
+ reply(PendingIn, ok),
+ {reply, {value, Value}, State #gstate { values = NewValues },
+ hibernate}
end;
handle_call(Msg, _From, State) ->
@@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
{noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
hibernate};
-handle_cast({in, Value}, State = #gstate { values = Values,
- blocked = Blocked }) ->
- {noreply, case queue:out(Blocked) of
- {empty, _} ->
- State #gstate { values = queue:in(Value, Values) };
- {{value, From}, NewBlocked} ->
- gen_server2:reply(From, {value, Value}),
- State #gstate { blocked = NewBlocked }
- end, hibernate};
+handle_cast({in, Value}, State) ->
+ {noreply, in(Value, undefined, State), hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
@@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+%%----------------------------------------------------------------------------
+
+in(Value, From, State = #gstate { values = Values, blocked = Blocked }) ->
+ case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in({From, Value}, Values) };
+ {{value, PendingOut}, NewBlocked} ->
+ reply(From, ok),
+ gen_server2:reply(PendingOut, {value, Value}),
+ State #gstate { blocked = NewBlocked }
+ end.
+
+reply(undefined, _Reply) -> ok;
+reply(From, Reply) -> gen_server2:reply(From, Reply).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7b417b00c6..93808f8413 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -364,7 +364,7 @@ status() ->
{running_applications, application:which_applications(infinity)},
{os, os:type()},
{erlang_version, erlang:system_info(system_version)},
- {memory, erlang:memory()}],
+ {memory, rabbit_vm:memory()}],
S2 = rabbit_misc:filter_exit_map(
fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end,
[{vm_memory_high_watermark, {vm_memory_monitor,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 30df2b5ca3..b8aad11afa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -86,6 +86,7 @@
-define(STATISTICS_KEYS,
[pid,
+ policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
@@ -496,32 +497,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
- never;
-should_confirm_message(#delivery{sender = SenderPid,
+send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) ->
+ {never, State};
+send_or_record_confirm(#delivery{sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- #q{q = #amqqueue{durable = true}}) ->
- {eventually, SenderPid, MsgSeqNo, MsgId};
-should_confirm_message(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo},
- _State) ->
- {immediately, SenderPid, MsgSeqNo}.
-
-needs_confirming({eventually, _, _, _}) -> true;
-needs_confirming(_) -> false.
-
-maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId},
- State = #q{msg_id_to_channel = MTC}) ->
- State#q{msg_id_to_channel =
- gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)};
-maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) ->
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
+ {eventually, State#q{msg_id_to_channel = MTC1}};
+send_or_record_confirm(#delivery{sender = SenderPid,
+ msg_seq_no = MsgSeqNo}, State) ->
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- State;
-maybe_record_confirm_message(_Confirm, State) ->
- State.
+ {immediately, State}.
run_message_queue(State) ->
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
@@ -543,33 +533,27 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props,
{{Message, Props#message_properties.delivered, AckTag},
true, State1#q{backing_queue_state = BQS3}}
end, false, State#q{backing_queue_state = BQS1});
- {Duplicate, BQS1} ->
- %% if the message has previously been seen by the BQ then
- %% it must have been seen under the same circumstances as
- %% now: i.e. if it is now a deliver_immediately then it
- %% must have been before.
- {case Duplicate of
- published -> true;
- discarded -> false
- end,
- State#q{backing_queue_state = BQS1}}
+ {published, BQS1} ->
+ {true, State#q{backing_queue_state = BQS1}};
+ {discarded, BQS1} ->
+ {false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, Delivered,
- State) ->
- Confirm = should_confirm_message(Delivery, State),
+deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+ Delivered, State) ->
+ {Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State) of
- {true, State1} ->
- maybe_record_confirm_message(Confirm, State1);
+ case attempt_delivery(Delivery, Props, State1) of
+ {true, State2} ->
+ State2;
%% the next one is an optimisations
%% TODO: optimise the Confirm =/= never case too
- {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
- discard_delivery(Delivery, State1);
- {false, State1} ->
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
+ {false, State2 = #q{ttl = 0, dlx = undefined,
+ backing_queue = BQ, backing_queue_state = BQS}}
+ when Confirm == never ->
+ BQS1 = BQ:discard(Message, SenderPid, BQS),
+ State2#q{backing_queue_state = BQS1};
+ {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
@@ -689,15 +673,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-discard_delivery(#delivery{sender = SenderPid,
- message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-
message_properties(Confirm, Delivered, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL),
- needs_confirming = needs_confirming(Confirm),
+ needs_confirming = Confirm == eventually,
delivered = Delivered}.
calculate_msg_expiry(undefined) -> undefined;
@@ -890,6 +868,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) ->
'';
i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) ->
ExclusiveOwner;
+i(policy, #q{q = #amqqueue{name = Name}}) ->
+ {ok, Q} = rabbit_amqqueue:lookup(Name),
+ case rabbit_policy:name(Q) of
+ none -> '';
+ Policy -> Policy
+ end;
i(exclusive_consumer_pid, #q{exclusive_consumer = none}) ->
'';
i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) ->
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index d69a6c3b98..c6d1778532 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -24,6 +24,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
@@ -117,7 +118,7 @@
%% first time the message id appears in the result of
%% drain_confirmed. All subsequent appearances of that message id will
%% be ignored.
--callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+-callback drain_confirmed(state()) -> {msg_ids(), state()}.
%% Drop messages from the head of the queue while the supplied predicate returns
%% true. Also accepts a boolean parameter that determines whether the messages
@@ -136,7 +137,7 @@
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
--callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback ack([ack()], state()) -> {msg_ids(), state()}.
%% Acktags supplied are for messages which should be processed. The
%% provided callback function is called with each message.
@@ -144,7 +145,7 @@
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
--callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+-callback requeue([ack()], state()) -> {msg_ids(), state()}.
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 4cc96ef552..a205b23d0b 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable;
i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete;
i(internal, #exchange{internal = Internal}) -> Internal;
i(arguments, #exchange{arguments = Arguments}) -> Arguments;
-i(policy, X) -> rabbit_policy:name(X);
+i(policy, X) -> case rabbit_policy:name(X) of
+ none -> '';
+ Policy -> Policy
+ end;
i(Item, _) -> throw({bad_argument, Item}).
info(X = #exchange{}) -> infos(?INFO_KEYS, X).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 72dcfc95fc..6cd71fc314 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -101,19 +101,25 @@
%% channel during a publish, only some of the mirrors may receive that
%% publish. As a result of this problem, the messages broadcast over
%% the gm contain published content, and thus slaves can operate
-%% successfully on messages that they only receive via the gm. The key
-%% purpose of also sending messages directly from the channels to the
-%% slaves is that without this, in the event of the death of the
-%% master, messages could be lost until a suitable slave is promoted.
+%% successfully on messages that they only receive via the gm.
%%
-%% However, that is not the only reason. For example, if confirms are
-%% in use, then there is no guarantee that every slave will see the
-%% delivery with the same msg_seq_no. As a result, the slaves have to
-%% wait until they've seen both the publish via gm, and the publish
-%% via the channel before they have enough information to be able to
-%% perform the publish to their own bq, and subsequently issue the
-%% confirm, if necessary. Either form of publish can arrive first, and
-%% a slave can be upgraded to the master at any point during this
+%% The key purpose of also sending messages directly from the channels
+%% to the slaves is that without this, in the event of the death of
+%% the master, messages could be lost until a suitable slave is
+%% promoted. However, that is not the only reason. A slave cannot send
+%% confirms for a message until it has seen it from the
+%% channel. Otherwise, it might send a confirm to a channel for a
+%% message that it might *never* receive from that channel. This can
+%% happen because new slaves join the gm ring (and thus receive
+%% messages from the master) before inserting themselves in the
+%% queue's mnesia record (which is what channels look at for routing).
+%% As it turns out, channels will simply ignore such bogus confirms,
+%% but relying on that would introduce a dangerously tight coupling.
+%%
+%% Hence the slaves have to wait until they've seen both the publish
+%% via gm, and the publish via the channel before they issue the
+%% confirm. Either form of publish can arrive first, and a slave can
+%% be upgraded to the master at any point during this
%% process. Confirms continue to be issued correctly, however.
%%
%% Because the slave is a full process, it impersonates parts of the
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e3c0b0286b..e6638b011b 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -385,8 +385,10 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
case dict:find(MsgId, SS) of
error ->
ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
- seen_status = dict:erase(MsgId, SS) };
+ ensure_monitoring(
+ ChPid, State #state {
+ backing_queue_state = BQ:discard(Msg, ChPid, BQS),
+ seen_status = dict:erase(MsgId, SS) });
{ok, discarded} ->
State
end.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index e7a608b6b0..ae1004c3e2 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -277,16 +277,26 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
false -> promote_slave(Available)
end
end;
+%% When we need to add nodes, we randomise our candidate list as a
+%% crude form of load-balancing. TODO it would also be nice to
+%% randomise the list of ones to remove when we have too many - but
+%% that would fail to take account of synchronisation...
suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = (Possible -- [MNode]) -- SNodes,
+ true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
suggested_queue_nodes(_, _, {MNode, _}, _) ->
{MNode, []}.
+shuffle(L) ->
+ {A1,A2,A3} = now(),
+ random:seed(A1, A2, A3),
+ {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
+ L1.
+
actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
{case MPid of
none -> none;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 4debf55ff4..ff0ac9cd27 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -146,37 +146,36 @@ init(#amqqueue { name = QueueName } = Q) ->
end.
init_it(Self, GM, Node, QueueName) ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids, gm_pids = GMPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> add_slave(Q1, Self, GM),
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
+ [] -> add_slave(Q, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
- true -> Q2 = Q1#amqqueue{gm_pids = [{GM, existing} |
- GMPids]},
- ok = rabbit_amqqueue:store_queue(Q2),
+ true -> Q1 = Q#amqqueue { gm_pids = [{GM, existing} |
+ GMPids] },
+ ok = rabbit_amqqueue:store_queue(Q1),
existing;
- false -> add_slave(forget_slave(SPid, Q1), Self, GM),
+ false -> Q1 = Q#amqqueue {
+ slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {_, S} <- GMPids,
+ S =/= SPid] },
+ add_slave(Q1, Self, GM),
{new, QPid}
end
end.
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q = #amqqueue{gm_pids = GMPids, slave_pids = SPids}, New, GM) ->
+add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = SPids ++ [New],
gm_pids = [{GM, New} | GMPids]}).
-forget_slave(SPid, Q = #amqqueue{slave_pids = SPids,
- gm_pids = GMPids}) ->
- Q#amqqueue{slave_pids = SPids -- [SPid],
- gm_pids = [T || T = {S, _} <- GMPids, S =/= SPid]}.
-
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
@@ -417,16 +416,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% If it needed confirming, it'll have
%% already been done.
Acc;
- {ok, {published, ChPid}} ->
+ {ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
+ {CMsN, dict:store(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
dict:erase(MsgId, MSN)};
- {ok, {confirmed, _ChPid}} ->
+ {ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
%% and then delivered and ack'd before we've
@@ -493,18 +492,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%%
%% MS contains the following three entry types:
%%
- %% a) {published, ChPid}:
+ %% a) published:
%% published via gm only; pending arrival of publication from
%% channel, maybe pending confirm.
%%
%% b) {published, ChPid, MsgSeqNo}:
%% published via gm and channel; pending confirm.
%%
- %% c) {confirmed, ChPid}:
+ %% c) confirmed:
%% published via gm only, and confirmed; pending publication
%% from channel.
%%
- %% d) discarded
+ %% d) discarded:
%% seen via gm only as discarded. Pending publication from
%% channel
%%
@@ -522,22 +521,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
- MSList = dict:to_list(MS),
- SS = dict:from_list(
- [E || E = {_MsgId, discarded} <- MSList] ++
- [{MsgId, Status}
- || {MsgId, {Status, _ChPid}} <- MSList,
- Status =:= published orelse Status =:= confirmed]),
+ St = [published, confirmed, discarded],
+ SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
- gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
- (_, MTC0) ->
- MTC0
- end, gb_trees:empty(), MSList),
+ MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
+ (_Msgid, _Status, MTC0) ->
+ MTC0
+ end, gb_trees:empty(), MS),
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
rabbit_amqqueue_process:init_with_backing_queue_state(
@@ -648,27 +643,21 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, {confirmed, ChPid}} ->
- %% BQ has confirmed it but we didn't know what the
- %% msg_seq_no was at the time. We do now!
+ {ok, confirmed} ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 };
- {ok, {published, ChPid}} ->
- %% It was published to the BQ and we didn't know the
- %% msg_seq_no so couldn't confirm it at the time.
- {MS1, SQ1} =
- case needs_confirming(Delivery, State1) of
- never -> {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)};
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- {dict:store(MsgId, MMS, MS), SQ};
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- {dict:erase(MsgId, MS),
- remove_from_pending_ch(MsgId, ChPid, SQ)}
- end,
+ {ok, published} ->
+ MS1 = case needs_confirming(Delivery, State1) of
+ never -> dict:erase(MsgId, MS);
+ eventually -> MMS = {published, ChPid, MsgSeqNo},
+ dict:store(MsgId, MMS, MS);
+ immediately -> ok = rabbit_misc:confirm_to_sender(
+ ChPid, [MsgSeqNo]),
+ dict:erase(MsgId, MS)
+ end,
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 };
{ok, discarded} ->
@@ -701,20 +690,17 @@ process_instruction(
msg_id_status = MS }) ->
%% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. As a result, we
- %% may know that it needs confirming without knowing its
- %% msg_seq_no, which means that we can see the confirmation come
- %% back from the backing queue without knowing the msg_seq_no,
- %% which means that we're going to have to hang on to the fact
- %% that we've seen the msg_id confirmed until we can associate it
- %% with a msg_seq_no.
+ %% may not have seen it directly from the channel. But we cannot
+ %% issues confirms until the latter has happened. So we need to
+ %% keep track of the MsgId and its confirmation status in the
+ %% meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, {published, ChPid}, MS)};
+ dict:store(MsgId, published, MS)};
{{value, Delivery = #delivery {
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } }}, MQ2} ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6d6c648acb..21f581548d 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
+ gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 11f280bb7f..2e26837df0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -886,38 +886,49 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
Policy, Params, {OldM, OldSs}, All),
- NewSs = lists:sort(NewSs0)
+ NewSs1 = lists:sort(NewSs0),
+ case dm_list_match(NewSs, NewSs1, ExtraSs) of
+ ok -> ok;
+ error -> exit({no_match, NewSs, NewSs1, ExtraSs})
+ end
end,
- Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
%% Add a node
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
%% Promote slave to master by policy
- Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
passed.
+%% Does the first list match the second where the second is required
+%% to have exactly Extra superfluous items?
+dm_list_match([], [], 0) -> ok;
+dm_list_match(_, [], _Extra) -> error;
+dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra);
+dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1).
+
test_user_management() ->
%% lots if stuff that should fail
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
new file mode 100644
index 0000000000..53f3df18b3
--- /dev/null
+++ b/src/rabbit_vm.erl
@@ -0,0 +1,129 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_vm).
+
+-export([memory/0]).
+
+-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
+ "rfc4627_jsonrpc"]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(memory/0 :: () -> rabbit_types:infos()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% Like erlang:memory(), but with awareness of rabbit-y things
+memory() ->
+ Conns = (sup_memory(rabbit_tcp_client_sup) +
+ sup_memory(ssl_connection_sup) +
+ sup_memory(amqp_sup)),
+ Qs = (sup_memory(rabbit_amqqueue_sup) +
+ sup_memory(rabbit_mirror_queue_slave_sup)),
+ Mnesia = mnesia_memory(),
+ MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
+ MsgIndexProc = (pid_memory(msg_store_transient) +
+ pid_memory(msg_store_persistent)),
+ MgmtDbETS = ets_memory(rabbit_mgmt_db),
+ MgmtDbProc = sup_memory(rabbit_mgmt_sup),
+ Plugins = plugin_memory() - MgmtDbProc,
+
+ [{total, Total},
+ {processes, Processes},
+ {ets, ETS},
+ {atom, Atom},
+ {binary, Bin},
+ {code, Code},
+ {system, System}] =
+ erlang:memory([total, processes, ets, atom, binary, code, system]),
+
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins,
+
+ [{total, Total},
+ {connection_procs, Conns},
+ {queue_procs, Qs},
+ {plugins, Plugins},
+ {other_proc, lists:max([0, OtherProc])}, %% [1]
+ {mnesia, Mnesia},
+ {mgmt_db, MgmtDbETS + MgmtDbProc},
+ {msg_index, MsgIndexETS + MsgIndexProc},
+ {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS},
+ {binary, Bin},
+ {code, Code},
+ {atom, Atom},
+ {other_system, System - ETS - Atom - Bin - Code}].
+
+%% [1] - erlang:memory(processes) can be less than the sum of its
+%% parts. Rather than display something nonsensical, just silence any
+%% claims about negative memory. See
+%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html
+
+%%----------------------------------------------------------------------------
+
+sup_memory(Sup) ->
+ lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) +
+ pid_memory(Sup).
+
+sup_children(Sup) ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end).
+
+pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of
+ {memory, M} -> M;
+ _ -> 0
+ end;
+pid_memory(Name) when is_atom(Name) -> case whereis(Name) of
+ P when is_pid(P) -> pid_memory(P);
+ _ -> 0
+ end.
+
+child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid);
+child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid);
+child_memory(_, _) -> 0.
+
+mnesia_memory() ->
+ case mnesia:system_info(is_running) of
+ yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) ||
+ Tab <- mnesia:system_info(tables)]);
+ no -> 0
+ end.
+
+ets_memory(Name) ->
+ lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(),
+ N <- [ets:info(T, name)],
+ N =:= Name]).
+
+bytes(Words) -> Words * erlang:system_info(wordsize).
+
+plugin_memory() ->
+ lists:sum([plugin_memory(App) ||
+ {App, _, _} <- application:which_applications(),
+ is_plugin(atom_to_list(App))]).
+
+plugin_memory(App) ->
+ case catch application_master:get_child(
+ application_controller:get_master(App)) of
+ {Pid, _} -> sup_memory(Pid);
+ _ -> 0
+ end.
+
+is_plugin("rabbitmq_" ++ _) -> true;
+is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS).