summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl4
-rw-r--r--src/rabbit_control.erl23
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
-rw-r--r--src/rabbit_misc.erl14
-rw-r--r--src/rabbit_queue_index.erl14
6 files changed, 47 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eae6312b1a..a7c92e51eb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -224,7 +224,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end).
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index d358a041ef..22691ef9d3 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -106,7 +106,7 @@ qc_publish(#state{bqstate = BQ}) ->
[qc_message(),
#message_properties{needs_confirming = frequency([{1, true},
{20, false}]),
- expiry = choose(0, 10)},
+ expiry = oneof([undefined | lists:seq(1, 10)])},
self(), BQ]}.
qc_publish_multiple(#state{bqstate = BQ}) ->
@@ -375,7 +375,7 @@ rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
dropfun(Props) ->
Expiry = eval({call, erlang, element,
[?RECORD_INDEX(expiry, message_properties), Props]}),
- Expiry =/= 0.
+ Expiry =/= 1.
drop_messages(Messages) ->
case queue:out(Messages) of
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6eb1aaba9a..e8afed0c11 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -17,7 +17,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1]).
+-export([start/0, stop/0, action/5, diagnostics/1, log_action/3]).
-define(RPC_TIMEOUT, infinity).
-define(WAIT_FOR_VM_ATTEMPTS, 5).
@@ -51,6 +51,7 @@
-> 'ok').
-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
+-spec(log_action/3 :: (node(), string(), [term()]) -> ok).
-endif.
@@ -73,6 +74,7 @@ start() ->
Command = list_to_atom(Command0),
Quiet = proplists:get_bool(?QUIET_OPT, Opts1),
Node = proplists:get_value(?NODE_OPT, Opts1),
+ rpc_call(Node, rabbit_control, log_action, [node(), Command0, Args]),
Inform = case Quiet of
true -> fun (_Format, _Args1) -> ok end;
false -> fun (Format, Args1) ->
@@ -474,3 +476,22 @@ quit(Status) ->
{unix, _} -> halt(Status);
{win32, _} -> init:stop(Status)
end.
+
+log_action(Node, Command, Args) ->
+ rabbit_misc:with_local_io(
+ fun () ->
+ error_logger:info_msg("~p executing~n rabbitmqctl ~s ~s~n",
+ [Node, Command,
+ format_args(mask_args(Command, Args))])
+ end).
+
+%% Mask passwords and other sensitive info before logging.
+mask_args("add_user", [Name, _Password | Args]) ->
+ [Name, "****" | Args];
+mask_args("change_password", [Name, _Password | Args]) ->
+ [Name, "****" | Args];
+mask_args(_, Args) ->
+ Args.
+
+format_args(Args) ->
+ string:join([io_lib:format("~p", [A]) || A <- Args], " ").
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 12b6f3ca2e..c918f388ea 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -89,9 +89,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
%% ASSERTION
[] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
MPids1 = MPids ++ [Self],
- mnesia:write(rabbit_queue,
- Q1 #amqqueue { slave_pids = MPids1 },
- write),
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
{ok, QPid}
end),
erlang:monitor(process, MPid),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3bbfb1d75e..b98dbd462f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -42,7 +42,7 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
--export([format_stderr/2]).
+-export([format_stderr/2, with_local_io/1]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -165,6 +165,7 @@
-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
+-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
@@ -605,6 +606,17 @@ format_stderr(Fmt, Args) ->
end,
ok.
+%% Execute Fun using the IO system of the local node (i.e. the node on
+%% which the code is executing).
+with_local_io(Fun) ->
+ GL = group_leader(),
+ group_leader(whereis(user), self()),
+ try
+ Fun()
+ after
+ group_leader(GL, self())
+ end.
+
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
case Do(App) of
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bf89cdb26e..6388da8f79 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -569,13 +569,13 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
add_to_journal(RelSeq, Action,
Segment = #segment { journal_entries = JEntries,
unacked = UnackedCount }) ->
- Segment1 = Segment #segment {
- journal_entries = add_to_journal(RelSeq, Action, JEntries) },
- case Action of
- del -> Segment1;
- ack -> Segment1 #segment { unacked = UnackedCount - 1 };
- ?PUB -> Segment1 #segment { unacked = UnackedCount + 1 }
- end;
+ Segment #segment {
+ journal_entries = add_to_journal(RelSeq, Action, JEntries),
+ unacked = UnackedCount + case Action of
+ ?PUB -> +1;
+ del -> 0;
+ ack -> -1
+ end};
add_to_journal(RelSeq, Action, JEntries) ->
Val = case array:get(RelSeq, JEntries) of