summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-03-11 16:17:25 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-03-11 16:17:25 +0100
commit17a5da8f085e4d4acb349fc41ceca0e923ac67d3 (patch)
tree6d3b51561ea384f3b51f477b05d91237ebe1f3e7 /src
parente03e7da3115c54e8a7ca54ad3cc2c4c74f7ed417 (diff)
parent578cfc1916a4b6a8202b2f4698e35eb76942f061 (diff)
downloadrabbitmq-server-git-17a5da8f085e4d4acb349fc41ceca0e923ac67d3.tar.gz
Merge branch 'master' into stable
Diffstat (limited to 'src')
-rw-r--r--src/app_utils.erl8
-rw-r--r--src/delegate.erl29
-rw-r--r--src/file_handle_cache.erl170
-rw-r--r--src/file_handle_cache_stats.erl67
-rw-r--r--src/gatherer.erl14
-rw-r--r--src/lqueue.erl4
-rw-r--r--src/pmon.erl13
-rw-r--r--src/rabbit.erl45
-rw-r--r--src/rabbit_access_control.erl126
-rw-r--r--src/rabbit_amqqueue.erl124
-rw-r--r--src/rabbit_amqqueue_process.erl38
-rw-r--r--src/rabbit_auth_backend_dummy.erl25
-rw-r--r--src/rabbit_auth_backend_internal.erl34
-rw-r--r--src/rabbit_auth_mechanism.erl4
-rw-r--r--src/rabbit_authn_backend.erl49
-rw-r--r--src/rabbit_authz_backend.erl (renamed from src/rabbit_auth_backend.erl)40
-rw-r--r--src/rabbit_autoheal.erl199
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_binary_parser.erl102
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_cli.erl21
-rw-r--r--src/rabbit_control_main.erl73
-rw-r--r--src/rabbit_direct.erl17
-rw-r--r--src/rabbit_epmd_monitor.erl101
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl11
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl37
-rw-r--r--src/rabbit_mirror_queue_misc.erl117
-rw-r--r--src/rabbit_mirror_queue_slave.erl43
-rw-r--r--src/rabbit_mirror_queue_sync.erl5
-rw-r--r--src/rabbit_misc.erl73
-rw-r--r--src/rabbit_mnesia.erl50
-rw-r--r--src/rabbit_mnesia_rename.erl267
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_node_monitor.erl252
-rw-r--r--src/rabbit_nodes.erl19
-rw-r--r--src/rabbit_priority_queue.erl574
-rw-r--r--src/rabbit_queue_index.erl431
-rw-r--r--src/rabbit_reader.erl87
-rw-r--r--src/rabbit_trace.erl20
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/rabbit_upgrade.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl14
-rw-r--r--src/rabbit_variable_queue.erl716
-rw-r--r--src/truncate.erl4
-rw-r--r--src/worker_pool.erl31
-rw-r--r--src/worker_pool_worker.erl5
50 files changed, 3112 insertions, 1015 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl
index ad270518ad..87e6fa0b69 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -62,13 +62,7 @@ start_applications(Apps, ErrorHandler) ->
stop_applications(Apps, ErrorHandler) ->
manage_applications(fun lists:foldr/3,
- %% Mitigation for bug 26467. TODO remove when we fix it.
- fun (mnesia) ->
- timer:sleep(1000),
- application:stop(mnesia);
- (App) ->
- application:stop(App)
- end,
+ fun application:stop/1,
fun application:start/1,
not_started,
ErrorHandler,
diff --git a/src/delegate.erl b/src/delegate.erl
index 378759a64b..1a0df98220 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -16,6 +16,35 @@
-module(delegate).
+%% delegate is an alternative way of doing remote calls. Compared to
+%% the rpc module, it reduces inter-node communication. For example,
+%% if a message is routed to 1,000 queues on node A and needs to be
+%% propagated to nodes B and C, it would be nice to avoid doing 2,000
+%% remote casts to queue processes.
+%%
+%% An important issue here is preserving order - we need to make sure
+%% that messages from a certain channel to a certain queue take a
+%% consistent route, to prevent them being reordered. In fact all
+%% AMQP-ish things (such as queue declaration results and basic.get)
+%% must take the same route as well, to ensure that clients see causal
+%% ordering correctly. Therefore we have a rather generic mechanism
+%% here rather than just a message-reflector. That's also why we pick
+%% the delegate process to use based on a hash of the source pid.
+%%
+%% When a function is invoked using delegate:invoke/2, delegate:call/2
+%% or delegate:cast/2 on a group of pids, the pids are first split
+%% into local and remote ones. Remote processes are then grouped by
+%% node. The function is then invoked locally and on every node (using
+%% gen_server2:multi/4) as many times as there are processes on that
+%% node, sequentially.
+%%
+%% Errors returned when executing functions on remote nodes are re-raised
+%% in the caller.
+%%
+%% RabbitMQ starts a pool of delegate processes on boot. The size of
+%% the pool is configurable, the aim is to make sure we don't have too
+%% few delegates and thus limit performance on many-CPU machines.
+
-behaviour(gen_server2).
-export([start_link/1, invoke_no_result/2, invoke/2,
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3a7a692c5c..5a916c75bc 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -178,6 +178,10 @@
write_buffer_size,
write_buffer_size_limit,
write_buffer,
+ read_buffer,
+ read_buffer_pos,
+ read_buffer_rem, %% Num of bytes from pos to end
+ read_buffer_size_limit,
at_eof,
path,
mode,
@@ -237,7 +241,8 @@
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
(file:filename(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')} |
+ {'read_buffer', (non_neg_integer() | 'unbuffered')}])
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
@@ -331,16 +336,48 @@ close(Ref) ->
read(Ref, Count) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case prim_file:read(Hdl, Count) of
- {ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
- {Obj,
- [Handle #handle { offset = Offset1 }]};
- eof -> {eof, [Handle #handle { at_eof = true }]};
- Error -> {Error, [Handle]}
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ offset = Offset}]) when BufRem >= Count ->
+ <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf,
+ {{ok, Res}, [Handle#handle{offset = Offset + Count,
+ read_buffer_pos = BufPos + Count,
+ read_buffer_rem = BufRem - Count}]};
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_size_limit = BufSzLimit,
+ hdl = Hdl,
+ offset = Offset}]) ->
+ WantedCount = Count - BufRem,
+ case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of
+ {ok, Data} ->
+ <<_:BufPos/binary, BufTl/binary>> = Buf,
+ ReadCount = size(Data),
+ case ReadCount < WantedCount of
+ true ->
+ OffSet1 = Offset + BufRem + ReadCount,
+ {{ok, <<BufTl/binary, Data/binary>>},
+ [reset_read_buffer(
+ Handle#handle{offset = OffSet1})]};
+ false ->
+ <<Hd:WantedCount/binary, _/binary>> = Data,
+ OffSet1 = Offset + BufRem + WantedCount,
+ BufRem1 = ReadCount - WantedCount,
+ {{ok, <<BufTl/binary, Hd/binary>>},
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Data,
+ read_buffer_pos = WantedCount,
+ read_buffer_rem = BufRem1}]}
+ end;
+ eof ->
+ {eof, [Handle #handle { at_eof = true }]};
+ Error ->
+ {Error, [reset_read_buffer(Handle)]}
end
end).
@@ -355,7 +392,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {prim_file:write(Hdl, Data),
+ {prim_file_write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -377,12 +414,12 @@ append(Ref, Data) ->
sync(Ref) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([#handle { is_dirty = false, write_buffer = [] }]) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case prim_file:sync(Hdl) of
+ case prim_file_sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -397,7 +434,7 @@ needs_sync(Ref) ->
position(Ref, NewOffset) ->
with_flushed_handles(
- [Ref],
+ [Ref], keep,
fun ([Handle]) -> {Result, Handle1} = maybe_seek(NewOffset, Handle),
{Result, [Handle1]}
end).
@@ -465,8 +502,8 @@ clear(Ref) ->
fun ([#handle { at_eof = true, write_buffer_size = 0, offset = 0 }]) ->
ok;
([Handle]) ->
- case maybe_seek(bof, Handle #handle { write_buffer = [],
- write_buffer_size = 0 }) of
+ case maybe_seek(bof, Handle#handle{write_buffer = [],
+ write_buffer_size = 0}) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
@@ -539,6 +576,21 @@ info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%% Internal functions
%%----------------------------------------------------------------------------
+prim_file_read(Hdl, Size) ->
+ file_handle_cache_stats:update(
+ io_read, Size, fun() -> prim_file:read(Hdl, Size) end).
+
+prim_file_write(Hdl, Bytes) ->
+ file_handle_cache_stats:update(
+ io_write, iolist_size(Bytes), fun() -> prim_file:write(Hdl, Bytes) end).
+
+prim_file_sync(Hdl) ->
+ file_handle_cache_stats:update(io_sync, fun() -> prim_file:sync(Hdl) end).
+
+prim_file_position(Hdl, NewOffset) ->
+ file_handle_cache_stats:update(
+ io_seek, fun() -> prim_file:position(Hdl, NewOffset) end).
+
is_reader(Mode) -> lists:member(read, Mode).
is_writer(Mode) -> lists:member(write, Mode).
@@ -550,8 +602,15 @@ append_to_write(Mode) ->
end.
with_handles(Refs, Fun) ->
+ with_handles(Refs, reset, Fun).
+
+with_handles(Refs, ReadBuffer, Fun) ->
case get_or_reopen([{Ref, reopen} || Ref <- Refs]) of
- {ok, Handles} ->
+ {ok, Handles0} ->
+ Handles = case ReadBuffer of
+ reset -> [reset_read_buffer(H) || H <- Handles0];
+ keep -> Handles0
+ end,
case Fun(Handles) of
{Result, Handles1} when is_list(Handles1) ->
lists:zipwith(fun put_handle/2, Refs, Handles1),
@@ -564,8 +623,11 @@ with_handles(Refs, Fun) ->
end.
with_flushed_handles(Refs, Fun) ->
+ with_flushed_handles(Refs, reset, Fun).
+
+with_flushed_handles(Refs, ReadBuffer, Fun) ->
with_handles(
- Refs,
+ Refs, ReadBuffer,
fun (Handles) ->
case lists:foldl(
fun (Handle, {ok, HandlesAcc}) ->
@@ -611,20 +673,23 @@ reopen([], Tree, RefHdls) ->
{ok, lists:reverse(RefHdls)};
reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
path = Path,
- mode = Mode,
+ mode = Mode0,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case prim_file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ Mode = case NewOrReopen of
+ new -> Mode0;
+ reopen -> file_handle_cache_stats:update(io_reopen),
+ [read | Mode0]
+ end,
+ case prim_file:open(Path, Mode) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
- maybe_seek(Offset, Handle #handle { hdl = Hdl,
- offset = 0,
- last_used_at = Now }),
+ maybe_seek(Offset, reset_read_buffer(
+ Handle#handle{hdl = Hdl,
+ offset = 0,
+ last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
[{Ref, Handle1} | RefHdls]);
@@ -709,6 +774,11 @@ new_closed_handle(Path, Mode, Options) ->
infinity -> infinity;
N when is_integer(N) -> N
end,
+ ReadBufferSize =
+ case proplists:get_value(read_buffer, Options, unbuffered) of
+ unbuffered -> 0;
+ N2 when is_integer(N2) -> N2
+ end,
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
offset = 0,
@@ -716,6 +786,10 @@ new_closed_handle(Path, Mode, Options) ->
write_buffer_size = 0,
write_buffer_size_limit = WriteBufferSize,
write_buffer = [],
+ read_buffer = <<>>,
+ read_buffer_pos = 0,
+ read_buffer_rem = 0,
+ read_buffer_size_limit = ReadBufferSize,
at_eof = false,
path = Path,
mode = Mode,
@@ -742,7 +816,7 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> prim_file:sync(Hdl);
+ true -> prim_file_sync(Hdl);
false -> ok
end,
ok = prim_file:close(Hdl),
@@ -776,17 +850,31 @@ hard_close(Handle) ->
Result
end.
-maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
- at_eof = AtEoF }) ->
- {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
- case (case NeedsSeek of
- true -> prim_file:position(Hdl, NewOffset);
- false -> {ok, Offset}
- end) of
- {ok, Offset1} = Result ->
- {Result, Handle #handle { offset = Offset1, at_eof = AtEoF1 }};
- {error, _} = Error ->
- {Error, Handle}
+maybe_seek(New, Handle = #handle{hdl = Hdl,
+ offset = Old,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ at_eof = AtEoF}) ->
+ {AtEoF1, NeedsSeek} = needs_seek(AtEoF, Old, New),
+ case NeedsSeek of
+ true when is_number(New) andalso
+ ((New >= Old andalso New =< BufRem + Old)
+ orelse (New < Old andalso Old - New =< BufPos)) ->
+ Diff = New - Old,
+ {{ok, New}, Handle#handle{offset = New,
+ at_eof = AtEoF1,
+ read_buffer_pos = BufPos + Diff,
+ read_buffer_rem = BufRem - Diff}};
+ true ->
+ case prim_file_position(Hdl, New) of
+ {ok, Offset1} = Result ->
+ {Result, reset_read_buffer(Handle#handle{offset = Offset1,
+ at_eof = AtEoF1})};
+ {error, _} = Error ->
+ {Error, Handle}
+ end;
+ false ->
+ {{ok, Old}, Handle}
end.
needs_seek( AtEoF, _CurOffset, cur ) -> {AtEoF, false};
@@ -817,7 +905,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file_write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -826,6 +914,11 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
{Error, Handle}
end.
+reset_read_buffer(Handle) ->
+ Handle#handle{read_buffer = <<>>,
+ read_buffer_pos = 0,
+ read_buffer_rem = 0}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
@@ -843,6 +936,7 @@ used(#fhc_state{open_count = C1,
%%----------------------------------------------------------------------------
init([AlarmSet, AlarmClear]) ->
+ file_handle_cache_stats:init(),
Limit = case application:get_env(file_handles_high_watermark) of
{ok, Watermark} when (is_integer(Watermark) andalso
Watermark > 0) ->
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
new file mode 100644
index 0000000000..de2f90c67a
--- /dev/null
+++ b/src/file_handle_cache_stats.erl
@@ -0,0 +1,67 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(file_handle_cache_stats).
+
+%% stats about read / write operations that go through the fhc.
+
+-export([init/0, update/3, update/2, update/1, get/0]).
+
+-define(TABLE, ?MODULE).
+
+-define(COUNT,
+ [io_reopen, mnesia_ram_tx, mnesia_disk_tx,
+ msg_store_read, msg_store_write,
+ queue_index_journal_write, queue_index_write, queue_index_read]).
+-define(COUNT_TIME, [io_sync, io_seek]).
+-define(COUNT_TIME_BYTES, [io_read, io_write]).
+
+init() ->
+ ets:new(?TABLE, [public, named_table]),
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME_BYTES,
+ Counter <- [count, bytes, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT_TIME,
+ Counter <- [count, time]],
+ [ets:insert(?TABLE, {{Op, Counter}, 0}) || Op <- ?COUNT,
+ Counter <- [count]].
+
+update(Op, Bytes, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, bytes}, Bytes),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op, Thunk) ->
+ {Time, Res} = timer_tc(Thunk),
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ets:update_counter(?TABLE, {Op, time}, Time),
+ Res.
+
+update(Op) ->
+ ets:update_counter(?TABLE, {Op, count}, 1),
+ ok.
+
+get() ->
+ lists:sort(ets:tab2list(?TABLE)).
+
+%% TODO timer:tc/1 was introduced in R14B03; use that function once we
+%% require that version.
+timer_tc(Thunk) ->
+ T1 = os:timestamp(),
+ Res = Thunk(),
+ T2 = os:timestamp(),
+ {timer:now_diff(T2, T1), Res}.
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 8bce170754..6a71021686 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -16,6 +16,20 @@
-module(gatherer).
+%% Gatherer is a queue which has producer and consumer processes. Before producers
+%% push items to the queue using gatherer:in/2 they need to declare their intent
+%% to do so with gatherer:fork/1. When a publisher's work is done, it states so
+%% using gatherer:finish/1.
+%%
+%% Consumers pop messages off queues with gatherer:out/1. If a queue is empty
+%% and there are producers that haven't finished working, the caller is blocked
+%% until an item is available. If there are no active producers, gatherer:out/1
+%% immediately returns 'empty'.
+%%
+%% This module is primarily used to collect results from asynchronous tasks
+%% running in a worker pool, e.g. when recovering bindings or rebuilding
+%% message store indices.
+
-behaviour(gen_server2).
-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 62f60d5ffb..941941de2e 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -16,6 +16,10 @@
-module(lqueue).
+%% lqueue implements a subset of Erlang's queue module. lqueues
+%% maintain their own length, so lqueue:len/1
+%% is an O(1) operation, in contrast with queue:len/1 which is O(n).
+
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
diff --git a/src/pmon.erl b/src/pmon.erl
index a94f5a67af..7981742284 100644
--- a/src/pmon.erl
+++ b/src/pmon.erl
@@ -16,6 +16,19 @@
-module(pmon).
+%% Process Monitor
+%% ================
+%%
+%% This module monitors processes so that every process has at most
+%% 1 monitor.
+%% Processes monitored can be dynamically added and removed.
+%%
+%% Unlike erlang:[de]monitor* functions, this module
+%% provides basic querying capability and avoids contacting down nodes.
+%%
+%% It is used to monitor nodes, queue mirrors, and by
+%% the queue collector, among other things.
+
-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
is_monitored/2, erase/2, monitored/1, is_empty/1]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7fdb02d09e..b55c129a91 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -118,6 +118,13 @@
{requires, [rabbit_alarm, guid_generator]},
{enables, core_initialized}]}).
+-rabbit_boot_step({rabbit_epmd_monitor,
+ [{description, "epmd monitor"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_epmd_monitor]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({core_initialized,
[{description, "core initialized"},
{requires, kernel_ready}]}).
@@ -243,15 +250,19 @@ maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile(),
- true;
+ {true, true} -> hipe_compile();
{true, false} -> false;
- {false, _} -> true
+ {false, _} -> {ok, disabled}
end.
-warn_if_hipe_compilation_failed(true) ->
+log_hipe_result({ok, disabled}) ->
ok;
-warn_if_hipe_compilation_failed(false) ->
+log_hipe_result({ok, Count, Duration}) ->
+ rabbit_log:info(
+ "HiPE in use: compiled ~B modules in ~Bs.~n", [Count, Duration]);
+log_hipe_result(false) ->
+ io:format(
+ "~nNot HiPE compiling: HiPE not found in this Erlang installation.~n"),
rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
@@ -276,8 +287,9 @@ hipe_compile() ->
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
T2 = erlang:now(),
- io:format("|~n~nCompiled ~B modules in ~Bs~n",
- [Count, timer:now_diff(T2, T1) div 1000000]).
+ Duration = timer:now_diff(T2, T1) div 1000000,
+ io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
+ {ok, Count, Duration}.
split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
@@ -307,9 +319,9 @@ start() ->
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- Success = maybe_hipe_compile(),
+ HipeResult = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
- warn_if_hipe_compilation_failed(Success),
+ log_hipe_result(HipeResult),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
@@ -323,6 +335,11 @@ broker_start() ->
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
start_apps(ToBeLoaded),
+ case code:load_file(sd_notify) of
+ {module, sd_notify} -> SDNotify = sd_notify,
+ SDNotify:sd_notify(0, "READY=1");
+ {error, _} -> ok
+ end,
ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
@@ -590,13 +607,19 @@ sort_boot_steps(UnsortedSteps) ->
boot_error({could_not_start, rabbit, {{timeout_waiting_for_tables, _}, _}},
_Stacktrace) ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
+ Suffix = "~nBACKGROUND~n==========~n~n"
+ "This cluster node was shut down while other nodes were still running.~n"
+ "To avoid losing data, you should start the other nodes first, then~n"
+ "start this one. To force this node to start, first invoke~n"
+ "\"rabbitmqctl force_boot\". If you do so, any changes made on other~n"
+ "cluster nodes after this one was shut down may be lost.~n",
{Err, Nodes} =
case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out.~n", []};
+ " are timing out.~n" ++ Suffix, []};
Ns -> {rabbit_misc:format(
- "Timeout contacting cluster nodes: ~p.~n", [Ns]),
+ "Timeout contacting cluster nodes: ~p.~n" ++ Suffix, [Ns]),
Ns}
end,
log_boot_error_and_exit(
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b0a9c0d807..41c54b07a2 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -19,7 +19,7 @@
-include("rabbit.hrl").
-export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2,
- check_vhost_access/2, check_resource_access/3]).
+ check_vhost_access/3, check_resource_access/3]).
%%----------------------------------------------------------------------------
@@ -31,15 +31,17 @@
-spec(check_user_pass_login/2 ::
(rabbit_types:username(), rabbit_types:password())
- -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+ -> {'ok', rabbit_types:user()} |
+ {'refused', rabbit_types:username(), string(), [any()]}).
-spec(check_user_login/2 ::
(rabbit_types:username(), [{atom(), any()}])
- -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+ -> {'ok', rabbit_types:user()} |
+ {'refused', rabbit_types:username(), string(), [any()]}).
-spec(check_user_loopback/2 :: (rabbit_types:username(),
rabbit_net:socket() | inet:ip_address())
-> 'ok' | 'not_allowed').
--spec(check_vhost_access/2 ::
- (rabbit_types:user(), rabbit_types:vhost())
+-spec(check_vhost_access/3 ::
+ (rabbit_types:user(), rabbit_types:vhost(), rabbit_net:socket())
-> 'ok' | rabbit_types:channel_exit()).
-spec(check_resource_access/3 ::
(rabbit_types:user(), rabbit_types:r(atom()), permission_atom())
@@ -55,36 +57,71 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
R = lists:foldl(
- fun ({ModN, ModZ}, {refused, _, _}) ->
+ fun ({ModN, ModZs0}, {refused, _, _, _}) ->
+ ModZs = case ModZs0 of
+ A when is_atom(A) -> [A];
+ L when is_list(L) -> L
+ end,
%% Different modules for authN vs authZ. So authenticate
%% with authN module, then if that succeeds do
- %% passwordless (i.e pre-authenticated) login with authZ
- %% module, and use the #user{} the latter gives us.
- case try_login(ModN, Username, AuthProps) of
- {ok, _} -> try_login(ModZ, Username, []);
- Else -> Else
+ %% passwordless (i.e pre-authenticated) login with authZ.
+ case try_authenticate(ModN, Username, AuthProps) of
+ {ok, ModNUser = #auth_user{username = Username2}} ->
+ user(ModNUser, try_authorize(ModZs, Username2));
+ Else ->
+ Else
end;
- (Mod, {refused, _, _}) ->
+ (Mod, {refused, _, _, _}) ->
%% Same module for authN and authZ. Just take the result
%% it gives us
- try_login(Mod, Username, AuthProps);
+ case try_authenticate(Mod, Username, AuthProps) of
+ {ok, ModNUser = #auth_user{impl = Impl}} ->
+ user(ModNUser, {ok, [{Mod, Impl}]});
+ Else ->
+ Else
+ end;
(_, {ok, User}) ->
%% We've successfully authenticated. Skip to the end...
{ok, User}
- end, {refused, "No modules checked '~s'", [Username]}, Modules),
- rabbit_event:notify(case R of
- {ok, _User} -> user_authentication_success;
- _ -> user_authentication_failure
- end, [{name, Username}]),
+ end,
+ {refused, Username, "No modules checked '~s'", [Username]}, Modules),
R.
-try_login(Module, Username, AuthProps) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else -> Else
+try_authenticate(Module, Username, AuthProps) ->
+ case Module:user_login_authentication(Username, AuthProps) of
+ {ok, AuthUser} -> {ok, AuthUser};
+ {error, E} -> {refused, Username,
+ "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ {refused, F, A} -> {refused, Username, F, A}
end.
+try_authorize(Modules, Username) ->
+ lists:foldr(
+ fun (Module, {ok, ModsImpls}) ->
+ case Module:user_login_authorization(Username) of
+ {ok, Impl} -> {ok, [{Module, Impl} | ModsImpls]};
+ {error, E} -> {refused, Username,
+ "~s failed authorizing ~s: ~p~n",
+ [Module, Username, E]};
+ {refused, F, A} -> {refused, Username, F, A}
+ end;
+ (_, {refused, F, A}) ->
+ {refused, Username, F, A}
+ end, {ok, []}, Modules).
+
+user(#auth_user{username = Username, tags = Tags}, {ok, ModZImpls}) ->
+ {ok, #user{username = Username,
+ tags = Tags,
+ authz_backends = ModZImpls}};
+user(_AuthUser, Error) ->
+ Error.
+
+auth_user(#user{username = Username, tags = Tags}, Impl) ->
+ #auth_user{username = Username,
+ tags = Tags,
+ impl = Impl}.
+
check_user_loopback(Username, SockOrAddr) ->
{ok, Users} = application:get_env(rabbit, loopback_users),
case rabbit_net:is_loopback(SockOrAddr)
@@ -93,29 +130,38 @@ check_user_loopback(Username, SockOrAddr) ->
false -> not_allowed
end.
-check_vhost_access(User = #user{ username = Username,
- auth_backend = Module }, VHostPath) ->
- check_access(
- fun() ->
- %% TODO this could be an andalso shortcut under >R13A
- case rabbit_vhost:exists(VHostPath) of
- false -> false;
- true -> Module:check_vhost_access(User, VHostPath)
- end
- end,
- Module, "access to vhost '~s' refused for user '~s'",
- [VHostPath, Username]).
+check_vhost_access(User = #user{username = Username,
+ authz_backends = Modules}, VHostPath, Sock) ->
+ lists:foldl(
+ fun({Mod, Impl}, ok) ->
+ check_access(
+ fun() ->
+ rabbit_vhost:exists(VHostPath) andalso
+ Mod:check_vhost_access(
+ auth_user(User, Impl), VHostPath, Sock)
+ end,
+ Mod, "access to vhost '~s' refused for user '~s'",
+ [VHostPath, Username]);
+ (_, Else) ->
+ Else
+ end, ok, Modules).
check_resource_access(User, R = #resource{kind = exchange, name = <<"">>},
Permission) ->
check_resource_access(User, R#resource{name = <<"amq.default">>},
Permission);
-check_resource_access(User = #user{username = Username, auth_backend = Module},
+check_resource_access(User = #user{username = Username,
+ authz_backends = Modules},
Resource, Permission) ->
- check_access(
- fun() -> Module:check_resource_access(User, Resource, Permission) end,
- Module, "access to ~s refused for user '~s'",
- [rabbit_misc:rs(Resource), Username]).
+ lists:foldl(
+ fun({Module, Impl}, ok) ->
+ check_access(
+ fun() -> Module:check_resource_access(
+ auth_user(User, Impl), Resource, Permission) end,
+ Module, "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(Resource), Username]);
+ (_, Else) -> Else
+ end, ok, Modules).
check_access(Fun, Module, ErrStr, ErrArgs) ->
Allow = case Fun() of
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0dfca854af..2ab03ccec4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -23,7 +23,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
@@ -149,8 +149,6 @@
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
qpids()).
--spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- qpids()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -265,17 +263,17 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
ok = check_declare_arguments(QueueName, Args),
Q = rabbit_queue_decorator:set(
- rabbit_policy:set(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- down_slave_nodes = [],
- gm_pids = [],
- state = live})),
+ rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = [],
+ recoverable_slaves = [],
+ gm_pids = [],
+ state = live})),
Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
gen_server2:call(
rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
@@ -469,7 +467,8 @@ declare_args() ->
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
- {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}].
+ {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
+ {<<"x-max-priority">>, fun check_non_neg_int_arg/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].
@@ -560,12 +559,12 @@ info_down(Q, DownReason) ->
info_down(Q, Items, DownReason) ->
[{Item, i_down(Item, Q, DownReason)} || Item <- Items].
-i_down(name, #amqqueue{name = Name}, _) -> Name;
-i_down(durable, #amqqueue{durable = Durable},_) -> Durable;
-i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
-i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
-i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
-i_down(down_slave_nodes, #amqqueue{down_slave_nodes = DSN}, _) -> DSN;
+i_down(name, #amqqueue{name = Name}, _) -> Name;
+i_down(durable, #amqqueue{durable = Dur}, _) -> Dur;
+i_down(auto_delete, #amqqueue{auto_delete = AD}, _) -> AD;
+i_down(arguments, #amqqueue{arguments = Args}, _) -> Args;
+i_down(pid, #amqqueue{pid = QPid}, _) -> QPid;
+i_down(recoverable_slaves, #amqqueue{recoverable_slaves = RS}, _) -> RS;
i_down(state, _Q, DownReason) -> DownReason;
i_down(K, _Q, _DownReason) ->
case lists:member(K, rabbit_amqqueue_process:info_keys()) of
@@ -623,10 +622,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) ->
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
-deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-
-deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
@@ -724,24 +719,48 @@ forget_all_durable(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_durable_queue,
#amqqueue{_ = '_'}, write),
- [forget_node_for_queue(Q) || #amqqueue{pid = Pid} = Q <- Qs,
+ [forget_node_for_queue(Node, Q) ||
+ #amqqueue{pid = Pid} = Q <- Qs,
node(Pid) =:= Node],
ok
end),
ok.
-forget_node_for_queue(#amqqueue{name = Name,
- down_slave_nodes = []}) ->
+%% Try to promote a slave while down - it should recover as a
+%% master. We try to take the oldest slave here for best chance of
+%% recovery.
+forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
+ forget_node_for_queue(DeadNode, RS, Q).
+
+forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
%% No slaves to recover from, queue is gone.
%% Don't process_deletions since that just calls callbacks and we
%% are not really up.
internal_delete1(Name, true);
-forget_node_for_queue(Q = #amqqueue{down_slave_nodes = [H|T]}) ->
- %% Promote a slave while down - it'll happily recover as a master
- Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H),
- down_slave_nodes = T},
- ok = mnesia:write(rabbit_durable_queue, Q1, write).
+%% Should not happen, but let's be conservative.
+forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
+ forget_node_for_queue(DeadNode, T, Q);
+
+forget_node_for_queue(DeadNode, [H|T], Q) ->
+ case node_permits_offline_promotion(H) of
+ false -> forget_node_for_queue(DeadNode, T, Q);
+ true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write)
+ end.
+
+node_permits_offline_promotion(Node) ->
+ case node() of
+ Node -> not rabbit:is_running(); %% [1]
+ _ -> Running = rabbit_mnesia:cluster_nodes(running),
+ not lists:member(Node, Running) %% [2]
+ end.
+%% [1] In this case if we are a real running node (i.e. rabbitmqctl
+%% has RPCed into us) then we cannot allow promotion. If on the other
+%% hand we *are* rabbitmqctl impersonating the node for offline
+%% node-forgetting then we can.
+%%
+%% [2] This is simpler; as long as it's down that's OK
run_backing_queue(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
@@ -763,12 +782,12 @@ on_node_up(Node) ->
fun () ->
Qs = mnesia:match_object(rabbit_queue,
#amqqueue{_ = '_'}, write),
- [case lists:member(Node, DSNs) of
- true -> DSNs1 = DSNs -- [Node],
+ [case lists:member(Node, RSs) of
+ true -> RSs1 = RSs -- [Node],
store_queue(
- Q#amqqueue{down_slave_nodes = DSNs1});
+ Q#amqqueue{recoverable_slaves = RSs1});
false -> ok
- end || #amqqueue{down_slave_nodes = DSNs} = Q <- Qs],
+ end || #amqqueue{recoverable_slaves = RSs} = Q <- Qs],
ok
end).
@@ -807,24 +826,29 @@ pseudo_queue(QueueName, Pid) ->
pid = Pid,
slave_pids = []}.
-immutable(Q) -> Q#amqqueue{pid = none,
- slave_pids = none,
- sync_slave_pids = none,
- down_slave_nodes = none,
- gm_pids = none,
- policy = none,
- decorators = none,
- state = none}.
+immutable(Q) -> Q#amqqueue{pid = none,
+ slave_pids = none,
+ sync_slave_pids = none,
+ recoverable_slaves = none,
+ gm_pids = none,
+ policy = none,
+ decorators = none,
+ state = none}.
-deliver([], _Delivery, _Flow) ->
+deliver([], _Delivery) ->
%% /dev/null optimisation
[];
-deliver(Qs, Delivery, Flow) ->
+deliver(Qs, Delivery = #delivery{flow = Flow}) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
+ %% We use up two credits to send to a slave since the message
+ %% arrives at the slave from two directions. We will ack one when
+ %% the slave receives the message direct from the channel, and the
+ %% other when it receives it via GM.
case Flow of
- flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ flow -> [credit_flow:send(QPid) || QPid <- QPids],
+ [credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
end,
@@ -833,8 +857,8 @@ deliver(Qs, Delivery, Flow) ->
%% after they have become master they should mark the message as
%% 'delivered' since they do not know what the master may have
%% done with it.
- MMsg = {deliver, Delivery, false, Flow},
- SMsg = {deliver, Delivery, true, Flow},
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
QPids.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 16ead64af9..c6030a090e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -83,7 +83,7 @@
memory,
slave_pids,
synchronised_slave_pids,
- down_slave_nodes,
+ recoverable_slaves,
state
]).
@@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
+ flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
MTC1 = case Confirm of
true -> confirm_messages([MsgId], MTC);
false -> MTC
end,
- BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
{BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) ->
end
end.
-attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
+ flow = Flow,
+ message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS),
+ {AckTag, BQS1} =
+ BQ:publish_delivered(
+ Message, Props, SenderPid, Flow, BQS),
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
@@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
State#q{consumers = Consumers})}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ sender = SenderPid,
+ flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
@@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
QLen = BQ:len(BQS4),
@@ -855,9 +861,9 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(down_slave_nodes, #q{q = #amqqueue{name = Name,
- durable = Durable}}) ->
- {ok, Q = #amqqueue{down_slave_nodes = Nodes}} =
+i(recoverable_slaves, #q{q = #amqqueue{name = Name,
+ durable = Durable}}) ->
+ {ok, Q = #amqqueue{recoverable_slaves = Nodes}} =
rabbit_amqqueue:lookup(Name),
case Durable andalso rabbit_mirror_queue_misc:is_mirrored(Q) of
false -> '';
@@ -1100,15 +1106,23 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ flow = Flow}, SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
+ case SlaveWhenPublished of
+ true -> credit_flow:ack(Sender); %% [0]
+ false -> ok
+ end,
pmon:monitor(Sender, Senders);
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, Delivered, State1));
+ noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
+%% [0] The second ack is since the channel thought we were a slave at
+%% the time it published this message, so it used two credits (see
+%% rabbit_amqqueue:deliver/2).
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
diff --git a/src/rabbit_auth_backend_dummy.erl b/src/rabbit_auth_backend_dummy.erl
index 5daca368eb..d2f07c1d57 100644
--- a/src/rabbit_auth_backend_dummy.erl
+++ b/src/rabbit_auth_backend_dummy.erl
@@ -17,11 +17,12 @@
-module(rabbit_auth_backend_dummy).
-include("rabbit.hrl").
--behaviour(rabbit_auth_backend).
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
--export([description/0]).
-export([user/0]).
--export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
+-export([user_login_authentication/2, user_login_authorization/1,
+ check_vhost_access/3, check_resource_access/3]).
-ifdef(use_specs).
@@ -31,19 +32,17 @@
%% A user to be used by the direct client when permission checks are
%% not needed. This user can do anything AMQPish.
-user() -> #user{username = <<"none">>,
- tags = [],
- auth_backend = ?MODULE,
- impl = none}.
+user() -> #user{username = <<"none">>,
+ tags = [],
+ authz_backends = [{?MODULE, none}]}.
%% Implementation of rabbit_auth_backend
-description() ->
- [{name, <<"Dummy">>},
- {description, <<"Database for the dummy user">>}].
+user_login_authentication(_, _) ->
+ {refused, "cannot log in conventionally as dummy user", []}.
-check_user_login(_, _) ->
+user_login_authorization(_) ->
{refused, "cannot log in conventionally as dummy user", []}.
-check_vhost_access(#user{}, _VHostPath) -> true.
-check_resource_access(#user{}, #resource{}, _Permission) -> true.
+check_vhost_access(#auth_user{}, _VHostPath, _Sock) -> true.
+check_resource_access(#auth_user{}, #resource{}, _Permission) -> true.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index fd1c4e8ee6..20a5766d20 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -17,10 +17,11 @@
-module(rabbit_auth_backend_internal).
-include("rabbit.hrl").
--behaviour(rabbit_auth_backend).
+-behaviour(rabbit_authn_backend).
+-behaviour(rabbit_authz_backend).
--export([description/0]).
--export([check_user_login/2, check_vhost_access/2, check_resource_access/3]).
+-export([user_login_authentication/2, user_login_authorization/1,
+ check_vhost_access/3, check_resource_access/3]).
-export([add_user/2, delete_user/1, lookup_user/1,
change_password/2, clear_password/1,
@@ -76,13 +77,9 @@
%%----------------------------------------------------------------------------
%% Implementation of rabbit_auth_backend
-description() ->
- [{name, <<"Internal">>},
- {description, <<"Internal user / password database">>}].
-
-check_user_login(Username, []) ->
+user_login_authentication(Username, []) ->
internal_check_user_login(Username, fun(_) -> true end);
-check_user_login(Username, [{password, Cleartext}]) ->
+user_login_authentication(Username, [{password, Cleartext}]) ->
internal_check_user_login(
Username,
fun (#internal_user{password_hash = <<Salt:4/binary, Hash/binary>>}) ->
@@ -90,25 +87,30 @@ check_user_login(Username, [{password, Cleartext}]) ->
(#internal_user{}) ->
false
end);
-check_user_login(Username, AuthProps) ->
+user_login_authentication(Username, AuthProps) ->
exit({unknown_auth_props, Username, AuthProps}).
+user_login_authorization(Username) ->
+ case user_login_authentication(Username, []) of
+ {ok, #auth_user{impl = Impl}} -> {ok, Impl};
+ Else -> Else
+ end.
+
internal_check_user_login(Username, Fun) ->
Refused = {refused, "user '~s' - invalid credentials", [Username]},
case lookup_user(Username) of
{ok, User = #internal_user{tags = Tags}} ->
case Fun(User) of
- true -> {ok, #user{username = Username,
- tags = Tags,
- auth_backend = ?MODULE,
- impl = User}};
+ true -> {ok, #auth_user{username = Username,
+ tags = Tags,
+ impl = none}};
_ -> Refused
end;
{error, not_found} ->
Refused
end.
-check_vhost_access(#user{username = Username}, VHostPath) ->
+check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) ->
case mnesia:dirty_read({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}}) of
@@ -116,7 +118,7 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[_R] -> true
end.
-check_resource_access(#user{username = Username},
+check_resource_access(#auth_user{username = Username},
#resource{virtual_host = VHostPath, name = Name},
Permission) ->
case mnesia:dirty_read({rabbit_user_permission,
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index d11af09552..c8e23a75d5 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -36,13 +36,13 @@
%% Another round is needed. Here's the state I want next time.
%% {protocol_error, Msg, Args}
%% Client got the protocol wrong. Log and die.
-%% {refused, Msg, Args}
+%% {refused, Username, Msg, Args}
%% Client failed authentication. Log and die.
-callback handle_response(binary(), any()) ->
{'ok', rabbit_types:user()} |
{'challenge', binary(), any()} |
{'protocol_error', string(), [any()]} |
- {'refused', string(), [any()]}.
+ {'refused', rabbit_types:username() | none, string(), [any()]}.
-else.
diff --git a/src/rabbit_authn_backend.erl b/src/rabbit_authn_backend.erl
new file mode 100644
index 0000000000..cfc3f5db51
--- /dev/null
+++ b/src/rabbit_authn_backend.erl
@@ -0,0 +1,49 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_authn_backend).
+
+-include("rabbit.hrl").
+
+-ifdef(use_specs).
+
+%% Check a user can log in, given a username and a proplist of
+%% authentication information (e.g. [{password, Password}]). If your
+%% backend is not to be used for authentication, this should always
+%% refuse access.
+%%
+%% Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {error, Error}
+%% Something went wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback user_login_authentication(rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:auth_user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}.
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{user_login_authentication, 2}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_authz_backend.erl
index a7dd6494b1..ff5f014e37 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_authz_backend.erl
@@ -14,47 +14,49 @@
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
--module(rabbit_auth_backend).
+-module(rabbit_authz_backend).
--ifdef(use_specs).
+-include("rabbit.hrl").
-%% A description proplist as with auth mechanisms,
-%% exchanges. Currently unused.
--callback description() -> [proplists:property()].
+-ifdef(use_specs).
-%% Check a user can log in, given a username and a proplist of
-%% authentication information (e.g. [{password, Password}]).
+%% Check a user can log in, when this backend is being used for
+%% authorisation only. Authentication has already taken place
+%% successfully, but we need to check that the user exists in this
+%% backend, and initialise any impl field we will want to have passed
+%% back in future calls to check_vhost_access/3 and
+%% check_resource_access/3.
%%
%% Possible responses:
-%% {ok, User}
-%% Authentication succeeded, and here's the user record.
+%% {ok, Impl}
+%% User authorisation succeeded, and here's the impl field.
%% {error, Error}
%% Something went wrong. Log and die.
%% {refused, Msg, Args}
-%% Client failed authentication. Log and die.
--callback check_user_login(rabbit_types:username(), [term()]) ->
- {'ok', rabbit_types:user()} |
+%% User authorisation failed. Log and die.
+-callback user_login_authorization(rabbit_types:username()) ->
+ {'ok', any()} |
{'refused', string(), [any()]} |
{'error', any()}.
-%% Given #user and vhost, can a user log in to a vhost?
+%% Given #auth_user and vhost, can a user log in to a vhost?
%% Possible responses:
%% true
%% false
%% {error, Error}
%% Something went wrong. Log and die.
--callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) ->
+-callback check_vhost_access(rabbit_types:auth_user(),
+ rabbit_types:vhost(), rabbit_net:socket()) ->
boolean() | {'error', any()}.
-
-%% Given #user, resource and permission, can a user access a resource?
+%% Given #auth_user, resource and permission, can a user access a resource?
%%
%% Possible responses:
%% true
%% false
%% {error, Error}
%% Something went wrong. Log and die.
--callback check_resource_access(rabbit_types:user(),
+-callback check_resource_access(rabbit_types:auth_user(),
rabbit_types:r(atom()),
rabbit_access_control:permission_atom()) ->
boolean() | {'error', any()}.
@@ -64,8 +66,8 @@
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2},
- {check_resource_access, 3}];
+ [{user_login_authorization, 1},
+ {check_vhost_access, 3}, {check_resource_access, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 09e9aa6ae9..566c8bb836 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -16,13 +16,16 @@
-module(rabbit_autoheal).
--export([init/0, maybe_start/1, rabbit_down/2, node_down/2, handle_msg/3]).
+-export([init/0, enabled/0, maybe_start/1, rabbit_down/2, node_down/2,
+ handle_msg/3]).
%% The named process we are running in.
-define(SERVER, rabbit_node_monitor).
-define(MNESIA_STOPPED_PING_INTERNAL, 200).
+-define(AUTOHEAL_STATE_AFTER_RESTART, rabbit_autoheal_state_after_restart).
+
%%----------------------------------------------------------------------------
%% In order to autoheal we want to:
@@ -45,9 +48,20 @@
%% stops - if a node stops for any other reason it just gets a message
%% it will ignore, and otherwise we carry on.
%%
+%% Meanwhile, the leader may continue to receive new autoheal requests:
+%% all of them are ignored. The winner notifies the leader when the
+%% current autoheal process is finished (ie. when all losers stopped and
+%% were asked to start again) or was aborted. When the leader receives
+%% the notification or if it looses contact with the winner, it can
+%% accept new autoheal requests.
+%%
%% The winner and the leader are not necessarily the same node.
%%
-%% Possible states:
+%% The leader can be a loser and will restart in this case. It remembers
+%% there is an autoheal in progress by temporarily saving the autoheal
+%% state to the application environment.
+%%
+%% == Possible states ==
%%
%% not_healing
%% - the default
@@ -56,31 +70,73 @@
%% - we are the winner and are waiting for all losing nodes to stop
%% before telling them they can restart
%%
-%% about_to_heal
-%% - we are the leader, and have already assigned the winner and
-%% losers. We are part of the losers and we wait for the winner_is
-%% announcement. This leader-specific state differs from not_healing
-%% (the state other losers are in), because the leader could still
-%% receive request_start messages: those subsequent requests must be
-%% ignored.
-%%
-%% {leader_waiting, OutstandingStops}
+%% {leader_waiting, Winner, Notify}
%% - we are the leader, and have already assigned the winner and losers.
-%% We are neither but need to ignore further requests to autoheal.
+%% We are waiting for a confirmation from the winner that the autoheal
+%% process has ended. Meanwhile we can ignore autoheal requests.
+%% Because we may be a loser too, this state is saved to the application
+%% environment and restored on startup.
%%
%% restarting
%% - we are restarting. Of course the node monitor immediately dies
%% then so this state does not last long. We therefore send the
%% autoheal_safe_to_start message to the rabbit_outside_app_process
%% instead.
+%%
+%% == Message flow ==
+%%
+%% 1. Any node (leader included) >> {request_start, node()} >> Leader
+%% When Mnesia detects it is running partitioned or
+%% when a remote node starts, rabbit_node_monitor calls
+%% rabbit_autoheal:maybe_start/1. The message above is sent to the
+%% leader so the leader can take a decision.
+%%
+%% 2. Leader >> {become_winner, Losers} >> Winner
+%% The leader notifies the winner so the latter can proceed with
+%% the autoheal.
+%%
+%% 3. Winner >> {winner_is, Winner} >> All losers
+%% The winner notifies losers they must stop.
+%%
+%% 4. Winner >> autoheal_safe_to_start >> All losers
+%% When either all losers stopped or the autoheal process was
+%% aborted, the winner notifies losers they can start again.
+%%
+%% 5. Leader >> report_autoheal_status >> Winner
+%% The leader asks the autoheal status to the winner. This only
+%% happens when the leader is a loser too. If this is not the case,
+%% this message is never sent.
+%%
+%% 6. Winner >> {autoheal_finished, Winner} >> Leader
+%% The winner notifies the leader that the autoheal process was
+%% either finished or aborted (ie. autoheal_safe_to_start was sent
+%% to losers).
%%----------------------------------------------------------------------------
-init() -> not_healing.
+init() ->
+ %% We check the application environment for a saved autoheal state
+ %% saved during a restart. If this node is a leader, it is used
+ %% to determine if it needs to ask the winner to report about the
+ %% autoheal progress.
+ State = case application:get_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART) of
+ {ok, S} -> S;
+ undefined -> not_healing
+ end,
+ ok = application:unset_env(rabbit, ?AUTOHEAL_STATE_AFTER_RESTART),
+ case State of
+ {leader_waiting, Winner, _} ->
+ rabbit_log:info(
+ "Autoheal: in progress, requesting report from ~p~n", [Winner]),
+ send(Winner, report_autoheal_status);
+ _ ->
+ ok
+ end,
+ State.
maybe_start(not_healing) ->
case enabled() of
- true -> [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ true -> Leader = leader(),
send(Leader, {request_start, node()}),
rabbit_log:info("Autoheal request sent to ~p~n", [Leader]),
not_healing;
@@ -90,8 +146,15 @@ maybe_start(State) ->
State.
enabled() ->
- {ok, autoheal} =:= application:get_env(rabbit, cluster_partition_handling).
+ case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, autoheal} -> true;
+ {ok, {pause_if_all_down, _, autoheal}} -> true;
+ _ -> false
+ end.
+leader() ->
+ [Leader | _] = lists:usort(rabbit_mnesia:cluster_nodes(all)),
+ Leader.
%% This is the winner receiving its last notification that a node has
%% stopped - all nodes can now start again
@@ -102,14 +165,13 @@ rabbit_down(Node, {winner_waiting, [Node], Notify}) ->
rabbit_down(Node, {winner_waiting, WaitFor, Notify}) ->
{winner_waiting, WaitFor -- [Node], Notify};
-rabbit_down(Node, {leader_waiting, [Node]}) ->
- not_healing;
-
-rabbit_down(Node, {leader_waiting, WaitFor}) ->
- {leader_waiting, WaitFor -- [Node]};
+rabbit_down(Winner, {leader_waiting, Winner, Losers}) ->
+ abort([Winner], Losers);
rabbit_down(_Node, State) ->
- %% ignore, we already cancelled the autoheal process
+ %% Ignore. Either:
+ %% o we already cancelled the autoheal process;
+ %% o we are still waiting the winner's report.
State.
node_down(_Node, not_healing) ->
@@ -141,15 +203,10 @@ handle_msg({request_start, Node},
case node() =:= Winner of
true -> handle_msg({become_winner, Losers},
not_healing, Partitions);
- false -> send(Winner, {become_winner, Losers}), %% [0]
- case lists:member(node(), Losers) of
- true -> about_to_heal;
- false -> {leader_waiting, Losers}
- end
+ false -> send(Winner, {become_winner, Losers}),
+ {leader_waiting, Winner, Losers}
end
end;
-%% [0] If we are a loser we will never receive this message - but it
-%% won't stick in the mailbox as we are restarting anyway
handle_msg({request_start, Node},
State, _Partitions) ->
@@ -170,27 +227,49 @@ handle_msg({become_winner, Losers},
_ -> abort(Down, Losers)
end;
-handle_msg({winner_is, Winner},
- State, _Partitions)
- when State =:= not_healing orelse State =:= about_to_heal ->
- rabbit_log:warning(
- "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
- rabbit_node_monitor:run_outside_applications(
- fun () ->
- MRef = erlang:monitor(process, {?SERVER, Winner}),
- rabbit:stop(),
- receive
- {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} -> ok;
- autoheal_safe_to_start -> ok
- end,
- erlang:demonitor(MRef, [flush]),
- rabbit:start()
- end),
+handle_msg({winner_is, Winner}, State = not_healing,
+ _Partitions) ->
+ %% This node is a loser, nothing else.
+ restart_loser(State, Winner),
+ restarting;
+handle_msg({winner_is, Winner}, State = {leader_waiting, Winner, _},
+ _Partitions) ->
+ %% This node is the leader and a loser at the same time.
+ restart_loser(State, Winner),
restarting;
handle_msg(_, restarting, _Partitions) ->
%% ignore, we can contribute no further
- restarting.
+ restarting;
+
+handle_msg(report_autoheal_status, not_healing, _Partitions) ->
+ %% The leader is asking about the autoheal status to us (the
+ %% winner). This happens when the leader is a loser and it just
+ %% restarted. We are in the "not_healing" state, so the previous
+ %% autoheal process ended: let's tell this to the leader.
+ send(leader(), {autoheal_finished, node()}),
+ not_healing;
+
+handle_msg(report_autoheal_status, State, _Partitions) ->
+ %% Like above, the leader is asking about the autoheal status. We
+ %% are not finished with it. There is no need to send anything yet
+ %% to the leader: we will send the notification when it is over.
+ State;
+
+handle_msg({autoheal_finished, Winner},
+ {leader_waiting, Winner, _}, _Partitions) ->
+ %% The winner is finished with the autoheal process and notified us
+ %% (the leader). We can transition to the "not_healing" state and
+ %% accept new requests.
+ rabbit_log:info("Autoheal finished according to winner ~p~n", [Winner]),
+ not_healing;
+
+handle_msg({autoheal_finished, Winner}, not_healing, _Partitions)
+ when Winner =:= node() ->
+ %% We are the leader and the winner. The state already transitioned
+ %% to "not_healing" at the end of the autoheal process.
+ rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]),
+ not_healing.
%%----------------------------------------------------------------------------
@@ -215,6 +294,7 @@ winner_finish(Notify) ->
%% losing nodes before sending the "autoheal_safe_to_start" signal.
wait_for_mnesia_shutdown(Notify),
[{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
+ send(leader(), {autoheal_finished, node()}),
not_healing.
wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
@@ -233,6 +313,35 @@ wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
wait_for_mnesia_shutdown([]) ->
ok.
+restart_loser(State, Winner) ->
+ rabbit_log:warning(
+ "Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
+ rabbit_node_monitor:run_outside_applications(
+ fun () ->
+ MRef = erlang:monitor(process, {?SERVER, Winner}),
+ rabbit:stop(),
+ NextState = receive
+ {'DOWN', MRef, process, {?SERVER, Winner}, _Reason} ->
+ not_healing;
+ autoheal_safe_to_start ->
+ State
+ end,
+ erlang:demonitor(MRef, [flush]),
+ %% During the restart, the autoheal state is lost so we
+ %% store it in the application environment temporarily so
+ %% init/0 can pick it up.
+ %%
+ %% This is useful to the leader which is a loser at the
+ %% same time: because the leader is restarting, there
+ %% is a great chance it misses the "autoheal finished!"
+ %% notification from the winner. Thanks to the saved
+ %% state, it knows it needs to ask the winner if the
+ %% autoheal process is finished or not.
+ application:set_env(rabbit,
+ ?AUTOHEAL_STATE_AFTER_RESTART, NextState),
+ rabbit:start()
+ end, true).
+
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
[[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 4ce133c3e3..55c8c971a0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -22,7 +22,8 @@
messages_unacknowledged_ram, messages_persistent,
message_bytes, message_bytes_ready,
message_bytes_unacknowledged, message_bytes_ram,
- message_bytes_persistent, backing_queue_status]).
+ message_bytes_persistent,
+ disk_reads, disk_writes, backing_queue_status]).
-ifdef(use_specs).
@@ -30,6 +31,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
@@ -99,19 +101,20 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), boolean(), pid(),
+ rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
-callback publish_delivered(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
+ rabbit_types:message_properties(), pid(), flow(),
+ state())
-> {ack(), state()}.
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
--callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
+-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -249,8 +252,8 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6},
+ {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 67109e7d5a..cd2846c00b 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -114,7 +114,7 @@ publish(X, Delivery) ->
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
- message = Message, msg_seq_no = MsgSeqNo}.
+ message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 3ab82cad78..ee8147f45f 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -41,48 +41,90 @@
%% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T
%% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V.
+-define(SIMPLE_PARSE_TABLE(BType, Pattern, RType),
+ parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ BType, Pattern, Rest/binary>>) ->
+ [{NameString, RType, Value} | parse_table(Rest)]).
+
+%% Note that we try to put these in approximately the order we expect
+%% to hit them, that's why the empty binary is half way through.
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, longstr, Value} | parse_table(Rest)];
+
+?SIMPLE_PARSE_TABLE($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_TABLE($T, Value:64/unsigned, timestamp);
+
parse_table(<<>>) ->
[];
-parse_table(<<NLen:8/unsigned, NameString:NLen/binary, ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{NameString, Type, Value} | parse_table(Rest)].
-parse_array(<<>>) ->
- [];
-parse_array(<<ValueAndRest/binary>>) ->
- {Type, Value, Rest} = parse_field_value(ValueAndRest),
- [{Type, Value} | parse_array(Rest)].
+?SIMPLE_PARSE_TABLE($b, Value:8/signed, byte);
+?SIMPLE_PARSE_TABLE($d, Value:64/float, double);
+?SIMPLE_PARSE_TABLE($f, Value:32/float, float);
+?SIMPLE_PARSE_TABLE($l, Value:64/signed, long);
+?SIMPLE_PARSE_TABLE($s, Value:16/signed, short);
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $t, Value:8/unsigned, Rest/binary>>) ->
+ [{NameString, bool, (Value /= 0)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{NameString, decimal, {Before, After}} | parse_table(Rest)];
-parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {longstr, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, table, parse_table(Value)} | parse_table(Rest)];
-parse_field_value(<<$I, V:32/signed, R/binary>>) ->
- {signedint, V, R};
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, array, parse_array(Value)} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{NameString, binary, Value} | parse_table(Rest)];
+
+parse_table(<<NLen:8/unsigned, NameString:NLen/binary,
+ $V, Rest/binary>>) ->
+ [{NameString, void, undefined} | parse_table(Rest)].
+
+-define(SIMPLE_PARSE_ARRAY(BType, Pattern, RType),
+ parse_array(<<BType, Pattern, Rest/binary>>) ->
+ [{RType, Value} | parse_array(Rest)]).
+
+parse_array(<<$S, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{longstr, Value} | parse_array(Rest)];
+
+?SIMPLE_PARSE_ARRAY($I, Value:32/signed, signedint);
+?SIMPLE_PARSE_ARRAY($T, Value:64/unsigned, timestamp);
+
+parse_array(<<>>) ->
+ [];
-parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
- {decimal, {Before, After}, R};
+?SIMPLE_PARSE_ARRAY($b, Value:8/signed, byte);
+?SIMPLE_PARSE_ARRAY($d, Value:64/float, double);
+?SIMPLE_PARSE_ARRAY($f, Value:32/float, float);
+?SIMPLE_PARSE_ARRAY($l, Value:64/signed, long);
+?SIMPLE_PARSE_ARRAY($s, Value:16/signed, short);
-parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
- {timestamp, V, R};
+parse_array(<<$t, Value:8/unsigned, Rest/binary>>) ->
+ [{bool, (Value /= 0)} | parse_array(Rest)];
-parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
- {table, parse_table(Table), R};
+parse_array(<<$D, Before:8/unsigned, After:32/unsigned, Rest/binary>>) ->
+ [{decimal, {Before, After}} | parse_array(Rest)];
-parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
- {array, parse_array(Array), R};
+parse_array(<<$F, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{table, parse_table(Value)} | parse_array(Rest)];
-parse_field_value(<<$b, V:8/signed, R/binary>>) -> {byte, V, R};
-parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_array(<<$A, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{array, parse_array(Value)} | parse_array(Rest)];
-parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
- {binary, V, R};
+parse_array(<<$x, VLen:32/unsigned, Value:VLen/binary, Rest/binary>>) ->
+ [{binary, Value} | parse_array(Rest)];
-parse_field_value(<<$V, R/binary>>) ->
- {void, undefined, R}.
+parse_array(<<$V, Rest/binary>>) ->
+ [{void, undefined} | parse_array(Rest)].
ensure_content_decoded(Content = #content{properties = Props})
when Props =/= none ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8632e1b3f7..63a5eb7e79 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -581,7 +581,8 @@ check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
check_user_id_header(
- #'P_basic'{}, #ch{user = #user{auth_backend = rabbit_auth_backend_dummy}}) ->
+ #'P_basic'{}, #ch{user = #user{authz_backends =
+ [{rabbit_auth_backend_dummy, _}]}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual,
@@ -660,7 +661,7 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
-check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+check_exchange_deletion(XName = #resource{name = <<"amq.", _/binary>>,
kind = exchange}) ->
rabbit_misc:protocol_error(
access_refused, "deletion of system ~s not allowed",
@@ -789,12 +790,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_in(Message, ConnName, ChannelNum,
- Username, TraceState),
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- DQ = {Delivery, QNames},
+ rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
+ Username, TraceState),
+ DQ = {Delivery#delivery{flow = flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
@@ -1665,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
@@ -1735,7 +1736,7 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx = none, confirmed = C}) ->
- case rabbit_node_monitor:pause_minority_guard() of
+ case rabbit_node_monitor:pause_partition_guard() of
ok -> MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
@@ -1747,7 +1748,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) ->
pausing -> State
end;
send_confirms(State) ->
- case rabbit_node_monitor:pause_minority_guard() of
+ case rabbit_node_monitor:pause_partition_guard() of
ok -> maybe_complete_tx(State);
pausing -> State
end.
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 47505b3df3..cf7b608399 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -17,7 +17,8 @@
-module(rabbit_cli).
-include("rabbit_cli.hrl").
--export([main/3, parse_arguments/4, rpc_call/4]).
+-export([main/3, start_distribution/0, start_distribution/1,
+ parse_arguments/4, rpc_call/4]).
%%----------------------------------------------------------------------------
@@ -31,6 +32,8 @@
-spec(main/3 :: (fun (([string()], string()) -> parse_result()),
fun ((atom(), atom(), [any()], [any()]) -> any()),
atom()) -> no_return()).
+-spec(start_distribution/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_distribution/1 :: (string()) -> {'ok', pid()} | {'error', any()}).
-spec(usage/1 :: (atom()) -> no_return()).
-spec(parse_arguments/4 ::
([{atom(), [{string(), optdef()}]} | atom()],
@@ -42,6 +45,8 @@
%%----------------------------------------------------------------------------
main(ParseFun, DoFun, UsageMod) ->
+ error_logger:tty(false),
+ start_distribution(),
{ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{Command, Opts, Args} =
case ParseFun(init:get_plain_arguments(), NodeStr) of
@@ -101,6 +106,20 @@ main(ParseFun, DoFun, UsageMod) ->
rabbit_misc:quit(2)
end.
+start_distribution() ->
+ start_distribution(list_to_atom(
+ rabbit_misc:format("rabbitmq-cli-~s", [os:getpid()]))).
+
+start_distribution(Name) ->
+ rabbit_nodes:ensure_epmd(),
+ net_kernel:start([Name, name_type()]).
+
+name_type() ->
+ case os:getenv("RABBITMQ_USE_LONGNAME") of
+ "true" -> longnames;
+ _ -> shortnames
+ end.
+
usage(Mod) ->
io:format("~s", [Mod:usage()]),
rabbit_misc:quit(1).
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index a931eef009..810f5f6f33 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -19,7 +19,7 @@
-include("rabbit_cli.hrl").
-export([start/0, stop/0, parse_arguments/2, action/5,
- sync_queue/1, cancel_sync_queue/1]).
+ sync_queue/1, cancel_sync_queue/1, become/1]).
-import(rabbit_cli, [rpc_call/4]).
@@ -40,6 +40,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ rename_cluster_node,
force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
@@ -104,8 +105,8 @@
-define(COMMANDS_NOT_REQUIRING_APP,
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
join_cluster, change_cluster_node_type, update_cluster_nodes,
- forget_cluster_node, cluster_status, status, environment, eval,
- force_boot]).
+ forget_cluster_node, rename_cluster_node, cluster_status, status,
+ environment, eval, force_boot]).
%%----------------------------------------------------------------------------
@@ -123,7 +124,6 @@
%%----------------------------------------------------------------------------
start() ->
- start_distribution(),
rabbit_cli:main(
fun (Args, NodeStr) ->
parse_arguments(Args, NodeStr)
@@ -234,6 +234,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
[ClusterNode, false])
end;
+action(rename_cluster_node, Node, NodesS, _Opts, Inform) ->
+ Nodes = split_list([list_to_atom(N) || N <- NodesS]),
+ Inform("Renaming cluster nodes:~n~s~n",
+ [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) ||
+ {F, T} <- Nodes])]),
+ rabbit_mnesia_rename:rename(Node, Nodes);
+
action(force_boot, Node, [], _Opts, Inform) ->
Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
case rabbit:is_running(Node) of
@@ -518,7 +525,7 @@ wait_for_startup(Node, Pid) ->
Node, Pid, fun() -> rpc:call(Node, rabbit, await_startup, []) =:= ok end).
while_process_is_alive(Node, Pid, Activity) ->
- case process_up(Pid) of
+ case rabbit_misc:is_os_process_alive(Pid) of
true -> case Activity() of
true -> ok;
false -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
@@ -528,7 +535,7 @@ while_process_is_alive(Node, Pid, Activity) ->
end.
wait_for_process_death(Pid) ->
- case process_up(Pid) of
+ case rabbit_misc:is_os_process_alive(Pid) of
true -> timer:sleep(?EXTERNAL_CHECK_INTERVAL),
wait_for_process_death(Pid);
false -> ok
@@ -552,62 +559,18 @@ read_pid_file(PidFile, Wait) ->
exit({error, {could_not_read_pid, E}})
end.
-% Test using some OS clunkiness since we shouldn't trust
-% rpc:call(os, getpid, []) at this point
-process_up(Pid) ->
- with_os([{unix, fun () ->
- run_ps(Pid) =:= 0
- end},
- {win32, fun () ->
- Cmd = "tasklist /nh /fi \"pid eq " ++ Pid ++ "\" ",
- Res = rabbit_misc:os_cmd(Cmd ++ "2>&1"),
- case re:run(Res, "erl\\.exe", [{capture, none}]) of
- match -> true;
- _ -> false
- end
- end}]).
-
-with_os(Handlers) ->
- {OsFamily, _} = os:type(),
- case proplists:get_value(OsFamily, Handlers) of
- undefined -> throw({unsupported_os, OsFamily});
- Handler -> Handler()
- end.
-
-run_ps(Pid) ->
- Port = erlang:open_port({spawn, "ps -p " ++ Pid},
- [exit_status, {line, 16384},
- use_stdio, stderr_to_stdout]),
- exit_loop(Port).
-
-exit_loop(Port) ->
- receive
- {Port, {exit_status, Rc}} -> Rc;
- {Port, _} -> exit_loop(Port)
- end.
-
-start_distribution() ->
- CtlNodeName = rabbit_misc:format("rabbitmqctl-~s", [os:getpid()]),
- {ok, _} = net_kernel:start([list_to_atom(CtlNodeName), name_type()]).
-
become(BecomeNode) ->
+ error_logger:tty(false),
+ ok = net_kernel:stop(),
case net_adm:ping(BecomeNode) of
pong -> exit({node_running, BecomeNode});
pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
- error_logger:tty(false),
- ok = net_kernel:stop(),
- {ok, _} = net_kernel:start([BecomeNode, name_type()]),
+ {ok, _} = rabbit_cli:start_distribution(BecomeNode),
io:format(" done~n", []),
Dir = mnesia:system_info(directory),
io:format(" * Mnesia directory : ~s~n", [Dir])
end.
-name_type() ->
- case os:getenv("RABBITMQ_USE_LONGNAME") of
- "true" -> longnames;
- _ -> shortnames
- end.
-
%%----------------------------------------------------------------------------
default_if_empty(List, Default) when is_list(List) ->
@@ -720,3 +683,7 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
{T, V} <- Value];
prettify_typed_amqp_value(_Type, Value) -> Value.
+
+split_list([]) -> [];
+split_list([_]) -> exit(even_list_needed);
+split_list([A, B | T]) -> [{A, B} | split_list(T)].
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 749a67b1d5..11233e7eb8 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -83,16 +83,27 @@ connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
connect0(AuthFun, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true -> case AuthFun() of
- {ok, User} ->
+ {ok, User = #user{username = Username}} ->
+ notify_auth_result(Username,
+ user_authentication_success, []),
connect1(User, VHost, Protocol, Pid, Infos);
- {refused, _M, _A} ->
+ {refused, Username, Msg, Args} ->
+ notify_auth_result(Username,
+ user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}]),
{error, {auth_failure, "Refused"}}
end;
false -> {error, broker_not_found_on_node}
end.
+notify_auth_result(Username, AuthResult, ExtraProps) ->
+ EventProps = [{connection_type, direct},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
connect1(User, VHost, Protocol, Pid, Infos) ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
+ try rabbit_access_control:check_vhost_access(User, VHost, undefined) of
ok -> ok = pg_local:join(rabbit_direct, Pid),
rabbit_event:notify(connection_created, Infos),
{ok, {User, rabbit_reader:server_properties(Protocol)}}
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
new file mode 100644
index 0000000000..261554921a
--- /dev/null
+++ b/src/rabbit_epmd_monitor.erl
@@ -0,0 +1,101 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_epmd_monitor).
+
+-behaviour(gen_server).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-record(state, {timer, mod, me, host, port}).
+
+-define(SERVER, ?MODULE).
+-define(CHECK_FREQUENCY, 60000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% It's possible for epmd to be killed out from underneath us. If that
+%% happens, then obviously clustering and rabbitmqctl stop
+%% working. This process checks up on epmd and restarts it /
+%% re-registers us with it if it has gone away.
+%%
+%% How could epmd be killed?
+%%
+%% 1) The most popular way for this to happen is when running as a
+%% Windows service. The user starts rabbitmqctl first, and this starts
+%% epmd under the user's account. When they log out epmd is killed.
+%%
+%% 2) Some packagings of (non-RabbitMQ?) Erlang apps might do "killall
+%% epmd" as a shutdown or uninstall step.
+%% ----------------------------------------------------------------------------
+
+start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+init([]) ->
+ {Me, Host} = rabbit_nodes:parts(node()),
+ Mod = net_kernel:epmd_module(),
+ {port, Port, _Version} = Mod:port_please(Me, Host),
+ {ok, ensure_timer(#state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port})}.
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(check, State) ->
+ check_epmd(State),
+ {noreply, ensure_timer(State#state{timer = undefined})};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+ensure_timer(State) ->
+ rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
+
+check_epmd(#state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port}) ->
+ case Mod:port_please(Me, Host) of
+ noport -> rabbit_log:warning(
+ "epmd does not know us, re-registering ~s at port ~b~n",
+ [Me, Port]),
+ rabbit_nodes:ensure_epmd(),
+ erl_epmd:register_node(Me, Port);
+ _ -> ok
+ end.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index f32a187d32..82f4275a3e 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -22,7 +22,7 @@
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
-%% rabbit_amqqueue:basic_consume/9, and rabbit_amqqueue:basic_get/4.
+%% rabbit_amqqueue:basic_consume/10, and rabbit_amqqueue:basic_get/4.
%% The latter isn't strictly necessary, since basic.get is not
%% subject to limiting, but it means that whenever a queue knows about
%% a channel, it also knows about its limiter, which is less fiddly.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index e05ef05af4..6f0865b300 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -16,7 +16,8 @@
-module(rabbit_log).
--export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, debug/1, debug/2, info/1, info/2, warning/1,
+ warning/2, error/1, error/2]).
-export([with_local_io/1]).
%%----------------------------------------------------------------------------
@@ -26,11 +27,13 @@
-export_type([level/0]).
-type(category() :: atom()).
--type(level() :: 'info' | 'warning' | 'error').
+-type(level() :: 'debug' | 'info' | 'warning' | 'error').
-spec(log/3 :: (category(), level(), string()) -> 'ok').
-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
+-spec(debug/1 :: (string()) -> 'ok').
+-spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -50,6 +53,7 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
case level(Level) =< catlevel(Category) of
false -> ok;
true -> F = case Level of
+ debug -> fun error_logger:info_msg/2;
info -> fun error_logger:info_msg/2;
warning -> fun error_logger:warning_msg/2;
error -> fun error_logger:error_msg/2
@@ -57,6 +61,8 @@ log(Category, Level, Fmt, Args) when is_list(Args) ->
with_local_io(fun () -> F(Fmt, Args) end)
end.
+debug(Fmt) -> log(default, debug, Fmt).
+debug(Fmt, Args) -> log(default, debug, Fmt, Args).
info(Fmt) -> log(default, info, Fmt).
info(Fmt, Args) -> log(default, info, Fmt, Args).
warning(Fmt) -> log(default, warning, Fmt).
@@ -75,6 +81,7 @@ catlevel(Category) ->
%%--------------------------------------------------------------------
+level(debug) -> 4;
level(info) -> 3;
level(warning) -> 2;
level(error) -> 1;
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index d328488298..38273e1c28 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -353,9 +353,10 @@ handle_cast({gm_deaths, DeadGMPids},
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
QueueName, MPid, DeadGMPids) of
- {ok, MPid, DeadPids} ->
+ {ok, MPid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index aa1e1ab9ee..0c05729292 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, purge_acks/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ purge/1, purge_acks/1, publish/6, publish_delivered/5,
+ discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
@@ -230,37 +230,38 @@ purge(State = #state { gm = GM,
purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
-publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
- ChPid, State = #state { gm = GM,
- seen_status = SS,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ ChPid, Flow, State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
-discard(MsgId, ChPid, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
+discard(MsgId, ChPid, Flow, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- ensure_monitoring(ChPid, State #state { backing_queue_state =
- BQ:discard(MsgId, ChPid, BQS) }).
+ ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
+ ensure_monitoring(ChPid,
+ State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, Flow, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 826b6927a1..ce63f7af1d 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -49,7 +49,7 @@
-spec(remove_from_queue/3 ::
(rabbit_amqqueue:name(), pid(), [pid()])
- -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/3 :: (rabbit_amqqueue:name(), [node()], 'sync' | 'async')
-> 'ok').
@@ -70,7 +70,7 @@
%%----------------------------------------------------------------------------
-%% Returns {ok, NewMPid, DeadPids}
+%% Returns {ok, NewMPid, DeadPids, ExtraNodes}
remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -78,10 +78,9 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% get here.
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
- [Q = #amqqueue { pid = QPid,
- slave_pids = SPids,
- gm_pids = GMPids,
- down_slave_nodes = DSNs}] ->
+ [Q = #amqqueue { pid = QPid,
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
lists:member(GM, DeadGMPids)
@@ -90,36 +89,35 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
- DSNs1 = [node(Pid) ||
- Pid <- SPids,
- not lists:member(Pid, AlivePids)] ++ DSNs,
{QPid1, SPids1} = promote_slave(Alive),
- case {{QPid, SPids}, {QPid1, SPids1}} of
- {Same, Same} ->
- ok;
- _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
- %% Either master hasn't changed, so
- %% we're ok to update mnesia; or we have
- %% become the master.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM,
- down_slave_nodes = DSNs1},
- store_updated_slaves(Q1),
- %% If we add and remove nodes at the same time we
- %% might tell the old master we need to sync and
- %% then shut it down. So let's check if the new
- %% master needs to sync.
- maybe_auto_sync(Q1);
+ Extra =
+ case {{QPid, SPids}, {QPid1, SPids1}} of
+ {Same, Same} ->
+ [];
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
+ %% Either master hasn't changed, so
+ %% we're ok to update mnesia; or we have
+ %% become the master.
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ %% If we add and remove nodes at the
+ %% same time we might tell the old
+ %% master we need to sync and then
+ %% shut it down. So let's check if
+ %% the new master needs to sync.
+ maybe_auto_sync(Q1),
+ slaves_to_start_on_failure(Q1, DeadGMPids);
_ ->
- %% Master has changed, and we're not it.
- %% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM,
- down_slave_nodes = DSNs1},
- store_updated_slaves(Q1)
- end,
- {ok, QPid1, DeadPids}
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ []
+ end,
+ {ok, QPid1, DeadPids, Extra}
end
end).
%% [1] We still update mnesia here in case the slave that is supposed
@@ -145,6 +143,17 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
%% aforementioned restriction on updating the master pid, that pid may
%% not be present in gm_pids, but only if said master has died.
+%% Sometimes a slave dying means we need to start more on other
+%% nodes - "exactly" mode can cause this to happen.
+slaves_to_start_on_failure(Q, DeadGMPids) ->
+ %% In case Mnesia has not caught up yet, filter out nodes we know
+ %% to be dead..
+ ClusterNodes = rabbit_mnesia:cluster_nodes(running) --
+ [node(P) || P <- DeadGMPids],
+ {_, OldNodes, _} = actual_queue_nodes(Q),
+ {_, NewNodes} = suggested_queue_nodes(Q, ClusterNodes),
+ NewNodes -- OldNodes.
+
on_node_up() ->
QNames =
rabbit_misc:execute_mnesia_transaction(
@@ -234,22 +243,39 @@ log(Level, QName, Fmt, Args) ->
rabbit_log:log(mirroring, Level, "Mirrored ~s: " ++ Fmt,
[rabbit_misc:rs(QName) | Args]).
-store_updated_slaves(Q = #amqqueue{pid = MPid,
- slave_pids = SPids,
- sync_slave_pids = SSPids,
- down_slave_nodes = DSNs}) ->
+store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
+ sync_slave_pids = SSPids,
+ recoverable_slaves = RS}) ->
%% TODO now that we clear sync_slave_pids in rabbit_durable_queue,
%% do we still need this filtering?
SSPids1 = [SSPid || SSPid <- SSPids, lists:member(SSPid, SPids)],
- DSNs1 = DSNs -- [node(P) || P <- [MPid | SPids]],
- Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
- down_slave_nodes = DSNs1,
- state = live},
+ Q1 = Q#amqqueue{sync_slave_pids = SSPids1,
+ recoverable_slaves = update_recoverable(SPids, RS),
+ state = live},
ok = rabbit_amqqueue:store_queue(Q1),
%% Wake it up so that we emit a stats event
rabbit_amqqueue:notify_policy_changed(Q1),
Q1.
+%% Recoverable nodes are those which we could promote if the whole
+%% cluster were to suddenly stop and we then lose the master; i.e. all
+%% nodes with running slaves, and all stopped nodes which had running
+%% slaves when they were up.
+%%
+%% Therefore we aim here to add new nodes with slaves, and remove
+%% running nodes without slaves, We also try to keep the order
+%% constant, and similar to the live SPids field (i.e. oldest
+%% first). That's not necessarily optimal if nodes spend a long time
+%% down, but we don't have a good way to predict what the optimal is
+%% in that case anyway, and we assume nodes will not just be down for
+%% a long time without being removed.
+update_recoverable(SPids, RS) ->
+ SNodes = [node(SPid) || SPid <- SPids],
+ RunningNodes = rabbit_mnesia:cluster_nodes(running),
+ AddNodes = SNodes -- RS,
+ DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
+ (RS -- DelNodes) ++ AddNodes.
+
%%----------------------------------------------------------------------------
promote_slave([SPid | SPids]) ->
@@ -344,6 +370,13 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
{NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
+ %% When a mirror dies, remove_from_queue/2 might have to add new
+ %% slaves (in "exactly" mode). It will check mnesia to see which
+ %% slaves there currently are. If drop_mirror/2 is invoked first
+ %% then when we end up in remove_from_queue/2 it will not see the
+ %% slaves that add_mirror/2 will add, and also want to add them
+ %% (even though we are not responding to the death of a
+ %% mirror). Breakage ensues.
add_mirrors (QName, NewNodes -- OldNodes, async),
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c64e35599c..96515f5c35 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -206,21 +206,28 @@ handle_call({gm_deaths, DeadGMPids}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids} ->
+ {ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
case Pid of
MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
+ rabbit_mirror_queue_misc:add_mirrors(
+ QName, ExtraNodes, async),
noreply(State);
Self ->
%% we've become master
QueueState = promote_me(From, State),
+ rabbit_mirror_queue_misc:add_mirrors(
+ QName, ExtraNodes, async),
{become, rabbit_amqqueue_process, QueueState, hibernate};
_ ->
%% master has changed to not us
gen_server2:reply(From, ok),
+ %% assertion, we don't need to add_mirrors/2 in this
+ %% branch, see last clause in remove_from_queue/2
+ [] = ExtraNodes,
%% Since GM is by nature lazy we need to make sure
%% there is some traffic when a master dies, to
%% make sure all slaves get informed of the
@@ -246,7 +253,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
@@ -631,7 +638,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
+ Deliveries = [promote_delivery(Delivery) ||
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -643,8 +650,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% [0] We reset mandatory to false here because we will have sent the
-%% mandatory_received already as soon as we got the message
+%% We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message. We also
+%% need to send an ack for these messages since the channel is waiting
+%% for one for the via-GM case and we will not now receive one.
+promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
+ Delivery#delivery{mandatory = false}.
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -826,24 +841,27 @@ publish_or_discard(Status, ChPid, MsgId,
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
-process_instruction({publish, ChPid, MsgProps,
+process_instruction({publish, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
-process_instruction({publish_delivered, ChPid, MsgProps,
+process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
-process_instruction({discard, ChPid, MsgId}, State) ->
+process_instruction({discard, ChPid, Flow, MsgId}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
@@ -902,6 +920,9 @@ process_instruction({delete_and_terminate, Reason},
BQ:delete_and_terminate(Reason, BQS),
{stop, State #state { backing_queue_state = undefined }}.
+maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid);
+maybe_flow_ack(_ChPid, noflow) -> ok.
+
msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index ee1d210556..9a8d55f94b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -263,9 +263,10 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
Props1 = Props#message_properties{needs_confirming = false},
{MA1, BQS1} =
case Unacked of
- false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ false -> {MA,
+ BQ:publish(Msg, Props1, true, none, noflow, BQS)};
true -> {AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, BQS),
+ Msg, Props1, none, noflow, BQS),
{[{Msg#basic_message.id, AckTag} | MA], BQS2}
end,
slave_sync_loop(Args, {MA1, TRef, BQS1});
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3e2c88ee34..5e9c7ceb40 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -44,7 +44,8 @@
-export([format/2, format_many/1, format_stderr/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
+-export([pid_to_string/1, string_to_pid/1,
+ pid_change_node/2, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -58,6 +59,7 @@
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
-export([os_cmd/1]).
+-export([is_os_process_alive/1]).
-export([gb_sets_difference/2]).
-export([version/0, otp_release/0, which_applications/0]).
-export([sequence_error/1]).
@@ -196,6 +198,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(pid_change_node/2 :: (pid(), node()) -> pid()).
-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
@@ -230,6 +233,7 @@
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
-spec(os_cmd/1 :: (string()) -> string()).
+-spec(is_os_process_alive/1 :: (non_neg_integer()) -> boolean()).
-spec(gb_sets_difference/2 :: (gb_sets:set(), gb_sets:set()) -> gb_sets:set()).
-spec(version/0 :: () -> string()).
-spec(otp_release/0 :: () -> string()).
@@ -520,8 +524,12 @@ execute_mnesia_transaction(TxFun) ->
Res = mnesia:sync_transaction(TxFun),
DiskLogAfter = mnesia_dumper:get_log_writes(),
case DiskLogAfter == DiskLogBefore of
- true -> Res;
- false -> {sync, Res}
+ true -> file_handle_cache_stats:update(
+ mnesia_ram_tx),
+ Res;
+ false -> file_handle_cache_stats:update(
+ mnesia_disk_tx),
+ {sync, Res}
end;
true -> mnesia:sync_transaction(TxFun)
end
@@ -686,11 +694,7 @@ sort_field_table(Arguments) ->
%% regardless of what node we are running on. The representation also
%% permits easy identification of the pid's node.
pid_to_string(Pid) when is_pid(Pid) ->
- %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
- %% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser} = decompose_pid(Pid),
format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
@@ -701,17 +705,32 @@ string_to_pid(Str) ->
case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, CreStr, IdStr, SerStr]} ->
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
[Cre, Id, Ser] = lists:map(fun list_to_integer/1,
[CreStr, IdStr, SerStr]),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
+ compose_pid(list_to_atom(NodeStr), Cre, Id, Ser);
nomatch ->
throw(Err)
end.
+pid_change_node(Pid, NewNode) ->
+ {_OldNode, Cre, Id, Ser} = decompose_pid(Pid),
+ compose_pid(NewNode, Cre, Id, Ser).
+
%% node(node_to_fake_pid(Node)) =:= Node.
node_to_fake_pid(Node) ->
- string_to_pid(format("<~s.0.0.0>", [Node])).
+ compose_pid(Node, 0, 0, 0).
+
+decompose_pid(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser}.
+
+compose_pid(Node, Cre, Id, Ser) ->
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>).
version_compare(A, B, lte) ->
case version_compare(A, B) of
@@ -915,6 +934,38 @@ os_cmd(Command) ->
end
end.
+is_os_process_alive(Pid) ->
+ with_os([{unix, fun () ->
+ run_ps(Pid) =:= 0
+ end},
+ {win32, fun () ->
+ Cmd = "tasklist /nh /fi \"pid eq " ++ Pid ++ "\" ",
+ Res = os_cmd(Cmd ++ "2>&1"),
+ case re:run(Res, "erl\\.exe", [{capture, none}]) of
+ match -> true;
+ _ -> false
+ end
+ end}]).
+
+with_os(Handlers) ->
+ {OsFamily, _} = os:type(),
+ case proplists:get_value(OsFamily, Handlers) of
+ undefined -> throw({unsupported_os, OsFamily});
+ Handler -> Handler()
+ end.
+
+run_ps(Pid) ->
+ Port = erlang:open_port({spawn, "ps -p " ++ Pid},
+ [exit_status, {line, 16384},
+ use_stdio, stderr_to_stdout]),
+ exit_loop(Port).
+
+exit_loop(Port) ->
+ receive
+ {Port, {exit_status, Rc}} -> Rc;
+ {Port, _} -> exit_loop(Port)
+ end.
+
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 80fbcd6835..bde4221f0c 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -109,27 +109,33 @@ init() ->
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
%% let's make it so.
- ok = global:sync(),
+ ok = rabbit_node_monitor:global_sync(),
ok.
init_from_config() ->
+ FindBadNodeNames = fun
+ (Name, BadNames) when is_atom(Name) -> BadNames;
+ (Name, BadNames) -> [Name | BadNames]
+ end,
{TryNodes, NodeType} =
case application:get_env(rabbit, cluster_nodes) of
+ {ok, {Nodes, Type} = Config}
+ when is_list(Nodes) andalso (Type == disc orelse Type == ram) ->
+ case lists:foldr(FindBadNodeNames, [], Nodes) of
+ [] -> Config;
+ BadNames -> e({invalid_cluster_node_names, BadNames})
+ end;
+ {ok, {_, BadType}} when BadType /= disc andalso BadType /= ram ->
+ e({invalid_cluster_node_type, BadType});
{ok, Nodes} when is_list(Nodes) ->
- Config = {Nodes -- [node()], case lists:member(node(), Nodes) of
- true -> disc;
- false -> ram
- end},
- rabbit_log:warning(
- "Converting legacy 'cluster_nodes' configuration~n ~w~n"
- "to~n ~w.~n~n"
- "Please update the configuration to the new format "
- "{Nodes, NodeType}, where Nodes contains the nodes that the "
- "node will try to cluster with, and NodeType is either "
- "'disc' or 'ram'~n", [Nodes, Config]),
- Config;
- {ok, Config} ->
- Config
+ %% The legacy syntax (a nodes list without the node
+ %% type) is unsupported.
+ case lists:foldr(FindBadNodeNames, [], Nodes) of
+ [] -> e(cluster_node_type_mandatory);
+ _ -> e(invalid_cluster_nodes_conf)
+ end;
+ {ok, _} ->
+ e(invalid_cluster_nodes_conf)
end,
case TryNodes of
[] -> init_db_and_upgrade([node()], disc, false);
@@ -850,6 +856,20 @@ nodes_excl_me(Nodes) -> Nodes -- [node()].
e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+error_description({invalid_cluster_node_names, BadNames}) ->
+ "In the 'cluster_nodes' configuration key, the following node names "
+ "are invalid: " ++ lists:flatten(io_lib:format("~p", [BadNames]));
+error_description({invalid_cluster_node_type, BadType}) ->
+ "In the 'cluster_nodes' configuration key, the node type is invalid "
+ "(expected 'disc' or 'ram'): " ++
+ lists:flatten(io_lib:format("~p", [BadType]));
+error_description(cluster_node_type_mandatory) ->
+ "The 'cluster_nodes' configuration key must indicate the node type: "
+ "either {[...], disc} or {[...], ram}";
+error_description(invalid_cluster_nodes_conf) ->
+ "The 'cluster_nodes' configuration key is invalid, it must be of the "
+ "form {[Nodes], Type}, where Nodes is a list of node names and "
+ "Type is either 'disc' or 'ram'";
error_description(clustering_only_disc_node) ->
"You cannot cluster a node if it is the only disc node in its existing "
" cluster. If new nodes joined while this node was offline, use "
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
new file mode 100644
index 0000000000..2787cb743c
--- /dev/null
+++ b/src/rabbit_mnesia_rename.erl
@@ -0,0 +1,267 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_mnesia_rename).
+-include("rabbit.hrl").
+
+-export([rename/2]).
+-export([maybe_finish/1]).
+
+-define(CONVERT_TABLES, [schema, rabbit_durable_queue]).
+
+%% Supports renaming the nodes in the Mnesia database. In order to do
+%% this, we take a backup of the database, traverse the backup
+%% changing node names and pids as we go, then restore it.
+%%
+%% That's enough for a standalone node, for clusters the story is more
+%% complex. We can take pairs of nodes From and To, but backing up and
+%% restoring the database changes schema cookies, so if we just do
+%% this on all nodes the cluster will refuse to re-form with
+%% "Incompatible schema cookies.". Therefore we do something similar
+%% to what we do for upgrades - the first node in the cluster to
+%% restart becomes the authority, and other nodes wipe their own
+%% Mnesia state and rejoin. They also need to tell Mnesia the old node
+%% is not coming back.
+%%
+%% If we are renaming nodes one at a time then the running cluster
+%% might not be aware that a rename has taken place, so after we wipe
+%% and rejoin we then update any tables (in practice just
+%% rabbit_durable_queue) which should be aware that we have changed.
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rename/2 :: (node(), [{node(), node()}]) -> 'ok').
+-spec(maybe_finish/1 :: ([node()]) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+rename(Node, NodeMapList) ->
+ try
+ %% Check everything is correct and figure out what we are
+ %% changing from and to.
+ {FromNode, ToNode, NodeMap} = prepare(Node, NodeMapList),
+
+ %% We backup and restore Mnesia even if other nodes are
+ %% running at the time, and defer the final decision about
+ %% whether to use our mutated copy or rejoin the cluster until
+ %% we restart. That means we might be mutating our copy of the
+ %% database while the cluster is running. *Do not* contact the
+ %% cluster while this is happening, we are likely to get
+ %% confused.
+ application:set_env(kernel, dist_auto_connect, never),
+
+ %% Take a copy we can restore from if we abandon the
+ %% rename. We don't restore from the "backup" since restoring
+ %% that changes schema cookies and might stop us rejoining the
+ %% cluster.
+ ok = rabbit_mnesia:copy_db(mnesia_copy_dir()),
+
+ %% And make the actual changes
+ rabbit_control_main:become(FromNode),
+ take_backup(before_backup_name()),
+ convert_backup(NodeMap, before_backup_name(), after_backup_name()),
+ ok = rabbit_file:write_term_file(rename_config_name(),
+ [{FromNode, ToNode}]),
+ convert_config_files(NodeMap),
+ rabbit_control_main:become(ToNode),
+ restore_backup(after_backup_name()),
+ ok
+ after
+ stop_mnesia()
+ end.
+
+prepare(Node, NodeMapList) ->
+ %% If we have a previous rename and haven't started since, give up.
+ case rabbit_file:is_dir(dir()) of
+ true -> exit({rename_in_progress,
+ "Restart node under old name to roll back"});
+ false -> ok = rabbit_file:ensure_dir(mnesia_copy_dir())
+ end,
+
+ %% Check we don't have two nodes mapped to the same node
+ {FromNodes, ToNodes} = lists:unzip(NodeMapList),
+ case length(FromNodes) - length(lists:usort(ToNodes)) of
+ 0 -> ok;
+ _ -> exit({duplicate_node, ToNodes})
+ end,
+
+ %% Figure out which node we are before and after the change
+ FromNode = case [From || {From, To} <- NodeMapList,
+ To =:= Node] of
+ [N] -> N;
+ [] -> Node
+ end,
+ NodeMap = dict:from_list(NodeMapList),
+ ToNode = case dict:find(FromNode, NodeMap) of
+ {ok, N2} -> N2;
+ error -> FromNode
+ end,
+
+ %% Check that we are in the cluster, all old nodes are in the
+ %% cluster, and no new nodes are.
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
+ lists:member(Node, Nodes ++ ToNodes)} of
+ {[], [], true} -> ok;
+ {[], [], false} -> exit({i_am_not_involved, Node});
+ {F, [], _} -> exit({nodes_not_in_cluster, F});
+ {_, T, _} -> exit({nodes_already_in_cluster, T})
+ end,
+ {FromNode, ToNode, NodeMap}.
+
+take_backup(Backup) ->
+ start_mnesia(),
+ ok = mnesia:backup(Backup),
+ stop_mnesia().
+
+restore_backup(Backup) ->
+ ok = mnesia:install_fallback(Backup, [{scope, local}]),
+ start_mnesia(),
+ stop_mnesia(),
+ rabbit_mnesia:force_load_next_boot().
+
+maybe_finish(AllNodes) ->
+ case rabbit_file:read_term_file(rename_config_name()) of
+ {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes);
+ _ -> ok
+ end.
+
+finish(FromNode, ToNode, AllNodes) ->
+ case node() of
+ ToNode ->
+ case rabbit_upgrade:nodes_running(AllNodes) of
+ [] -> finish_primary(FromNode, ToNode);
+ _ -> finish_secondary(FromNode, ToNode, AllNodes)
+ end;
+ FromNode ->
+ rabbit_log:info(
+ "Abandoning rename from ~s to ~s since we are still ~s~n",
+ [FromNode, ToNode, FromNode]),
+ [{ok, _} = file:copy(backup_of_conf(F), F) || F <- config_files()],
+ ok = rabbit_file:recursive_delete([rabbit_mnesia:dir()]),
+ ok = rabbit_file:recursive_copy(
+ mnesia_copy_dir(), rabbit_mnesia:dir()),
+ delete_rename_files();
+ _ ->
+ %% Boot will almost certainly fail but we might as
+ %% well just log this
+ rabbit_log:info(
+ "Rename attempted from ~s to ~s but we are ~s - ignoring.~n",
+ [FromNode, ToNode, node()])
+ end.
+
+finish_primary(FromNode, ToNode) ->
+ rabbit_log:info("Restarting as primary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ delete_rename_files(),
+ ok.
+
+finish_secondary(FromNode, ToNode, AllNodes) ->
+ rabbit_log:info("Restarting as secondary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ rabbit_upgrade:secondary_upgrade(AllNodes),
+ rename_in_running_mnesia(FromNode, ToNode),
+ delete_rename_files(),
+ ok.
+
+dir() -> rabbit_mnesia:dir() ++ "-rename".
+before_backup_name() -> dir() ++ "/backup-before".
+after_backup_name() -> dir() ++ "/backup-after".
+rename_config_name() -> dir() ++ "/pending.config".
+mnesia_copy_dir() -> dir() ++ "/mnesia-copy".
+
+delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]).
+
+start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ rabbit_table:force_load(),
+ rabbit_table:wait_for_replicated().
+stop_mnesia() -> stopped = mnesia:stop().
+
+convert_backup(NodeMap, FromBackup, ToBackup) ->
+ mnesia:traverse_backup(
+ FromBackup, ToBackup,
+ fun
+ (Row, Acc) ->
+ case lists:member(element(1, Row), ?CONVERT_TABLES) of
+ true -> {[update_term(NodeMap, Row)], Acc};
+ false -> {[Row], Acc}
+ end
+ end, switched).
+
+config_files() ->
+ [rabbit_node_monitor:running_nodes_filename(),
+ rabbit_node_monitor:cluster_status_filename()].
+
+backup_of_conf(Path) ->
+ filename:join([dir(), filename:basename(Path)]).
+
+convert_config_files(NodeMap) ->
+ [convert_config_file(NodeMap, Path) || Path <- config_files()].
+
+convert_config_file(NodeMap, Path) ->
+ {ok, Term} = rabbit_file:read_term_file(Path),
+ {ok, _} = file:copy(Path, backup_of_conf(Path)),
+ ok = rabbit_file:write_term_file(Path, update_term(NodeMap, Term)).
+
+lookup_node(OldNode, NodeMap) ->
+ case dict:find(OldNode, NodeMap) of
+ {ok, NewNode} -> NewNode;
+ error -> OldNode
+ end.
+
+mini_map(FromNode, ToNode) -> dict:from_list([{FromNode, ToNode}]).
+
+update_term(NodeMap, L) when is_list(L) ->
+ [update_term(NodeMap, I) || I <- L];
+update_term(NodeMap, T) when is_tuple(T) ->
+ list_to_tuple(update_term(NodeMap, tuple_to_list(T)));
+update_term(NodeMap, Node) when is_atom(Node) ->
+ lookup_node(Node, NodeMap);
+update_term(NodeMap, Pid) when is_pid(Pid) ->
+ rabbit_misc:pid_change_node(Pid, lookup_node(node(Pid), NodeMap));
+update_term(_NodeMap, Term) ->
+ Term.
+
+rename_in_running_mnesia(FromNode, ToNode) ->
+ All = rabbit_mnesia:cluster_nodes(all),
+ Running = rabbit_mnesia:cluster_nodes(running),
+ case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
+ {false, true} -> ok;
+ {true, _} -> exit({old_node_running, FromNode});
+ {_, false} -> exit({new_node_not_in_cluster, ToNode})
+ end,
+ {atomic, ok} = mnesia:del_table_copy(schema, FromNode),
+ Map = mini_map(FromNode, ToNode),
+ {atomic, _} = transform_table(rabbit_durable_queue, Map),
+ ok.
+
+transform_table(Table, Map) ->
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:lock({table, Table}, write),
+ transform_table(Table, Map, mnesia:first(Table))
+ end).
+
+transform_table(_Table, _Map, '$end_of_table') ->
+ ok;
+transform_table(Table, Map, Key) ->
+ [Term] = mnesia:read(Table, Key, write),
+ ok = mnesia:write(Table, update_term(Map, Term), write),
+ transform_table(Table, Map, mnesia:next(Table, Key)).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b829ae9476..9e3dd1e7c4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -473,6 +473,7 @@ write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ file_handle_cache_stats:update(msg_store_read),
%% Check the cur file cache
case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
@@ -507,6 +508,7 @@ server_cast(#client_msstate { server = Server }, Msg) ->
client_write(MsgId, Msg, Flow,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
+ file_handle_cache_stats:update(msg_store_write),
ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId, Flow}).
@@ -1299,7 +1301,8 @@ should_mask_action(CRef, MsgId,
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE},
+ {read_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1a2883748f..f5deaef388 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -394,7 +394,11 @@ node_listeners(Node) ->
mnesia:dirty_read(rabbit_listener, Node).
on_node_down(Node) ->
- ok = mnesia:dirty_delete(rabbit_listener, Node).
+ case lists:member(Node, nodes()) of
+ false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
+ true -> rabbit_log:info(
+ "Keep ~s listeners: the node is already back~n", [Node])
+ end.
start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 2fab29996b..0f00e66e55 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -25,14 +25,15 @@
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
-export([partitions/0, partitions/1, status/1, subscribe/1]).
--export([pause_minority_guard/0]).
+-export([pause_partition_guard/0]).
+-export([global_sync/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% Utils
--export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0,
+-export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0,
alive_nodes/1, alive_rabbit_nodes/1]).
-define(SERVER, ?MODULE).
@@ -64,10 +65,10 @@
-spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]).
-spec(status/1 :: ([node()]) -> {[{node(), [node()]}], [node()]}).
-spec(subscribe/1 :: (pid()) -> 'ok').
--spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing').
+-spec(pause_partition_guard/0 :: () -> 'ok' | 'pausing').
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
--spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
+-spec(run_outside_applications/2 :: (fun (() -> any()), boolean()) -> pid()).
-spec(ping_all/0 :: () -> 'ok').
-spec(alive_nodes/1 :: ([node()]) -> [node()]).
-spec(alive_rabbit_nodes/1 :: ([node()]) -> [node()]).
@@ -194,34 +195,43 @@ subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).
%%----------------------------------------------------------------------------
-%% pause_minority safety
+%% pause_minority/pause_if_all_down safety
%%----------------------------------------------------------------------------
%% If we are in a minority and pause_minority mode then a) we are
%% going to shut down imminently and b) we should not confirm anything
%% until then, since anything we confirm is likely to be lost.
%%
-%% We could confirm something by having an HA queue see the minority
+%% The same principles apply to a node which isn't part of the preferred
+%% partition when we are in pause_if_all_down mode.
+%%
+%% We could confirm something by having an HA queue see the pausing
%% state (and fail over into it) before the node monitor stops us, or
%% by using unmirrored queues and just having them vanish (and
%% confiming messages as thrown away).
%%
%% So we have channels call in here before issuing confirms, to do a
-%% lightweight check that we have not entered a minority state.
+%% lightweight check that we have not entered a pausing state.
-pause_minority_guard() ->
- case get(pause_minority_guard) of
- not_minority_mode ->
+pause_partition_guard() ->
+ case get(pause_partition_guard) of
+ not_pause_mode ->
ok;
undefined ->
{ok, M} = application:get_env(rabbit, cluster_partition_handling),
case M of
- pause_minority -> pause_minority_guard([], ok);
- _ -> put(pause_minority_guard, not_minority_mode),
- ok
+ pause_minority ->
+ pause_minority_guard([], ok);
+ {pause_if_all_down, PreferredNodes, _} ->
+ pause_if_all_down_guard(PreferredNodes, [], ok);
+ _ ->
+ put(pause_partition_guard, not_pause_mode),
+ ok
end;
{minority_mode, Nodes, LastState} ->
- pause_minority_guard(Nodes, LastState)
+ pause_minority_guard(Nodes, LastState);
+ {pause_if_all_down_mode, PreferredNodes, Nodes, LastState} ->
+ pause_if_all_down_guard(PreferredNodes, Nodes, LastState)
end.
pause_minority_guard(LastNodes, LastState) ->
@@ -231,11 +241,89 @@ pause_minority_guard(LastNodes, LastState) ->
false -> pausing;
true -> ok
end,
- put(pause_minority_guard,
+ put(pause_partition_guard,
{minority_mode, nodes(), NewState}),
NewState
end.
+pause_if_all_down_guard(PreferredNodes, LastNodes, LastState) ->
+ case nodes() of
+ LastNodes -> LastState;
+ _ -> NewState = case in_preferred_partition(PreferredNodes) of
+ false -> pausing;
+ true -> ok
+ end,
+ put(pause_partition_guard,
+ {pause_if_all_down_mode, PreferredNodes, nodes(),
+ NewState}),
+ NewState
+ end.
+
+%%----------------------------------------------------------------------------
+%% "global" hang workaround.
+%%----------------------------------------------------------------------------
+
+%% This code works around a possible inconsistency in the "global"
+%% state, causing global:sync/0 to never return.
+%%
+%% 1. A process is spawned.
+%% 2. If after 15", global:sync() didn't return, the "global"
+%% state is parsed.
+%% 3. If it detects that a sync is blocked for more than 10",
+%% the process sends fake nodedown/nodeup events to the two
+%% nodes involved (one local, one remote).
+%% 4. Both "global" instances restart their synchronisation.
+%% 5. globao:sync() finally returns.
+%%
+%% FIXME: Remove this workaround, once we got rid of the change to
+%% "dist_auto_connect" and fixed the bugs uncovered.
+
+global_sync() ->
+ Pid = spawn(fun workaround_global_hang/0),
+ ok = global:sync(),
+ Pid ! global_sync_done,
+ ok.
+
+workaround_global_hang() ->
+ receive
+ global_sync_done ->
+ ok
+ after 15000 ->
+ find_blocked_global_peers()
+ end.
+
+find_blocked_global_peers() ->
+ {status, _, _, [Dict | _]} = sys:get_status(global_name_server),
+ find_blocked_global_peers1(Dict).
+
+find_blocked_global_peers1([{{sync_tag_his, Peer}, Timestamp} | Rest]) ->
+ Diff = timer:now_diff(erlang:now(), Timestamp),
+ if
+ Diff >= 10000 -> unblock_global_peer(Peer);
+ true -> ok
+ end,
+ find_blocked_global_peers1(Rest);
+find_blocked_global_peers1([_ | Rest]) ->
+ find_blocked_global_peers1(Rest);
+find_blocked_global_peers1([]) ->
+ ok.
+
+unblock_global_peer(PeerNode) ->
+ ThisNode = node(),
+ PeerState = rpc:call(PeerNode, sys, get_status, [global_name_server]),
+ error_logger:info_msg(
+ "Global hang workaround: global state on ~s seems broken~n"
+ " * Peer global state: ~p~n"
+ " * Local global state: ~p~n"
+ "Faking nodedown/nodeup between ~s and ~s~n",
+ [PeerNode, PeerState, sys:get_status(global_name_server),
+ PeerNode, ThisNode]),
+ {global_name_server, ThisNode} ! {nodedown, PeerNode},
+ {global_name_server, PeerNode} ! {nodedown, ThisNode},
+ {global_name_server, ThisNode} ! {nodeup, PeerNode},
+ {global_name_server, PeerNode} ! {nodeup, ThisNode},
+ ok.
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -291,8 +379,9 @@ handle_cast(notify_node_up, State = #state{guid = GUID}) ->
%% 'check_partial_partition' to all the nodes it still thinks are
%% alive. If any of those (intermediate) nodes still see the "down"
%% node as up, they inform it that this has happened. The original
-%% node (in 'ignore' or 'autoheal' mode) will then disconnect from the
-%% intermediate node to "upgrade" to a full partition.
+%% node (in 'ignore', 'pause_if_all_down' or 'autoheal' mode) will then
+%% disconnect from the intermediate node to "upgrade" to a full
+%% partition.
%%
%% In pause_minority mode it will instead immediately pause until all
%% nodes come back. This is because the contract for pause_minority is
@@ -357,12 +446,22 @@ handle_cast({partial_partition, NotReallyDown, Proxy, MyGUID},
ArgsBase),
await_cluster_recovery(fun all_nodes_up/0),
{noreply, State};
+ {ok, {pause_if_all_down, PreferredNodes, _}} ->
+ case in_preferred_partition(PreferredNodes) of
+ true -> rabbit_log:error(
+ FmtBase ++ "We will therefore intentionally "
+ "disconnect from ~s~n", ArgsBase ++ [Proxy]),
+ upgrade_to_full_partition(Proxy);
+ false -> rabbit_log:info(
+ FmtBase ++ "We are about to pause, no need "
+ "for further actions~n", ArgsBase)
+ end,
+ {noreply, State};
{ok, _} ->
rabbit_log:error(
FmtBase ++ "We will therefore intentionally disconnect from ~s~n",
ArgsBase ++ [Proxy]),
- cast(Proxy, {partial_partition_disconnect, node()}),
- disconnect(Proxy),
+ upgrade_to_full_partition(Proxy),
{noreply, State}
end;
@@ -527,17 +626,29 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
%% that we can respond in the same way to "rabbitmqctl stop_app"
%% and "rabbitmqctl stop" as much as possible.
%%
- %% However, for pause_minority mode we can't do this, since we
- %% depend on looking at whether other nodes are up to decide
- %% whether to come back up ourselves - if we decide that based on
- %% the rabbit application we would go down and never come back.
+ %% However, for pause_minority and pause_if_all_down modes we can't do
+ %% this, since we depend on looking at whether other nodes are up
+ %% to decide whether to come back up ourselves - if we decide that
+ %% based on the rabbit application we would go down and never come
+ %% back.
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
- case majority() of
+ case majority([Node]) of
true -> ok;
false -> await_cluster_recovery(fun majority/0)
end,
State;
+ {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
+ case in_preferred_partition(PreferredNodes, [Node]) of
+ true -> ok;
+ false -> await_cluster_recovery(
+ fun in_preferred_partition/0)
+ end,
+ case HowToRecover of
+ autoheal -> State#state{autoheal =
+ rabbit_autoheal:node_down(Node, Autoheal)};
+ _ -> State
+ end;
{ok, ignore} ->
State;
{ok, autoheal} ->
@@ -549,35 +660,56 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
end.
await_cluster_recovery(Condition) ->
- rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
- []),
+ rabbit_log:warning("Cluster minority/secondary status detected - "
+ "awaiting recovery~n", []),
run_outside_applications(fun () ->
rabbit:stop(),
wait_for_cluster_recovery(Condition)
- end),
+ end, false),
ok.
-run_outside_applications(Fun) ->
+run_outside_applications(Fun, WaitForExistingProcess) ->
spawn(fun () ->
%% If our group leader is inside an application we are about
%% to stop, application:stop/1 does not return.
group_leader(whereis(init), self()),
- %% Ensure only one such process at a time, the
- %% exit(badarg) is harmless if one is already running
- try register(rabbit_outside_app_process, self()) of
- true ->
- try
- Fun()
- catch _:E ->
- rabbit_log:error(
- "rabbit_outside_app_process:~n~p~n~p~n",
- [E, erlang:get_stacktrace()])
- end
- catch error:badarg ->
- ok
- end
+ register_outside_app_process(Fun, WaitForExistingProcess)
end).
+register_outside_app_process(Fun, WaitForExistingProcess) ->
+ %% Ensure only one such process at a time, the exit(badarg) is
+ %% harmless if one is already running.
+ %%
+ %% If WaitForExistingProcess is false, the given fun is simply not
+ %% executed at all and the process exits.
+ %%
+ %% If WaitForExistingProcess is true, we wait for the end of the
+ %% currently running process before executing the given function.
+ try register(rabbit_outside_app_process, self()) of
+ true ->
+ do_run_outside_app_fun(Fun)
+ catch
+ error:badarg when WaitForExistingProcess ->
+ MRef = erlang:monitor(process, rabbit_outside_app_process),
+ receive
+ {'DOWN', MRef, _, _, _} ->
+ %% The existing process exited, let's try to
+ %% register again.
+ register_outside_app_process(Fun, WaitForExistingProcess)
+ end;
+ error:badarg ->
+ ok
+ end.
+
+do_run_outside_app_fun(Fun) ->
+ try
+ Fun()
+ catch _:E ->
+ rabbit_log:error(
+ "rabbit_outside_app_process:~n~p~n~p~n",
+ [E, erlang:get_stacktrace()])
+ end.
+
wait_for_cluster_recovery(Condition) ->
ping_all(),
case Condition() of
@@ -600,7 +732,9 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
%% that we do not attempt to deal with individual (other) partitions
%% going away. It's only safe to forget anything about partitions when
%% there are no partitions.
- Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of
+ Down = Partitions -- alive_rabbit_nodes(),
+ NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
+ Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
[] -> [];
_ -> Partitions
end,
@@ -661,6 +795,10 @@ del_node(Node, Nodes) -> Nodes -- [Node].
cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
+upgrade_to_full_partition(Proxy) ->
+ cast(Proxy, {partial_partition_disconnect, node()}),
+ disconnect(Proxy).
+
%% When we call this, it's because we want to force Mnesia to detect a
%% partition. But if we just disconnect_node/1 then Mnesia won't
%% detect a very short partition. So we want to force a slightly
@@ -683,14 +821,32 @@ disconnect(Node) ->
%% here. "rabbit" in a function's name implies we test if the rabbit
%% application is up, not just the node.
-%% As we use these functions to decide what to do in pause_minority
-%% state, they *must* be fast, even in the case where TCP connections
-%% are timing out. So that means we should be careful about whether we
-%% connect to nodes which are currently disconnected.
+%% As we use these functions to decide what to do in pause_minority or
+%% pause_if_all_down states, they *must* be fast, even in the case where
+%% TCP connections are timing out. So that means we should be careful
+%% about whether we connect to nodes which are currently disconnected.
majority() ->
+ majority([]).
+
+majority(NodesDown) ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ AliveNodes = alive_nodes(Nodes) -- NodesDown,
+ length(AliveNodes) / length(Nodes) > 0.5.
+
+in_preferred_partition() ->
+ {ok, {pause_if_all_down, PreferredNodes, _}} =
+ application:get_env(rabbit, cluster_partition_handling),
+ in_preferred_partition(PreferredNodes).
+
+in_preferred_partition(PreferredNodes) ->
+ in_preferred_partition(PreferredNodes, []).
+
+in_preferred_partition(PreferredNodes, NodesDown) ->
Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+ RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
+ AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
+ RealPreferredNodes =:= [] orelse AliveNodes =/= [].
all_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 7f7fcc3126..bbe0d35719 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -18,7 +18,7 @@
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
is_running/2, is_process_running/2,
- cluster_name/0, set_cluster_name/1]).
+ cluster_name/0, set_cluster_name/1, ensure_epmd/0]).
-include_lib("kernel/include/inet.hrl").
@@ -41,6 +41,7 @@
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
-spec(cluster_name/0 :: () -> binary()).
-spec(set_cluster_name/1 :: (binary()) -> 'ok').
+-spec(ensure_epmd/0 :: () -> 'ok').
-endif.
@@ -197,3 +198,19 @@ cluster_name_default() ->
set_cluster_name(Name) ->
rabbit_runtime_parameters:set_global(cluster_name, Name).
+
+ensure_epmd() ->
+ {ok, Prog} = init:get_argument(progname),
+ ID = random:uniform(1000000000),
+ Port = open_port(
+ {spawn_executable, os:find_executable(Prog)},
+ [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
+ "-noshell", "-eval", "halt()."]},
+ exit_status, stderr_to_stdout, use_stdio]),
+ port_shutdown_loop(Port).
+
+port_shutdown_loop(Port) ->
+ receive
+ {Port, {exit_status, _Rc}} -> ok;
+ {Port, _} -> port_shutdown_loop(Port)
+ end.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
new file mode 100644
index 0000000000..1d9522f613
--- /dev/null
+++ b/src/rabbit_priority_queue.erl
@@ -0,0 +1,574 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_priority_queue).
+
+-include_lib("rabbit.hrl").
+-include_lib("rabbit_framing.hrl").
+-behaviour(rabbit_backing_queue).
+
+%% enabled unconditionally. Disabling priority queueing after
+%% it has been enabled is dangerous.
+-rabbit_boot_step({?MODULE,
+ [{description, "enable priority queue"},
+ {mfa, {?MODULE, enable, []}},
+ {requires, pre_boot},
+ {enables, kernel_ready}]}).
+
+-export([enable/0]).
+
+-export([start/1, stop/0]).
+
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
+ publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
+ ackfold/4, fold/3, len/1, is_empty/1, depth/1,
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
+ handle_pre_hibernate/1, resume/1, msg_rates/1,
+ info/2, invoke/3, is_duplicate/2]).
+
+-record(state, {bq, bqss}).
+-record(passthrough, {bq, bqs}).
+
+%% See 'note on suffixes' below
+-define(passthrough1(F), State#passthrough{bqs = BQ:F}).
+-define(passthrough2(F),
+ {Res, BQS1} = BQ:F, {Res, State#passthrough{bqs = BQS1}}).
+-define(passthrough3(F),
+ {Res1, Res2, BQS1} = BQ:F, {Res1, Res2, State#passthrough{bqs = BQS1}}).
+
+%% This module adds suport for priority queues.
+%%
+%% Priority queues have one backing queue per priority. Backing queue functions
+%% then produce a list of results for each BQ and fold over them, sorting
+%% by priority.
+%%
+%%For queues that do not
+%% have priorities enabled, the functions in this module delegate to
+%% their "regular" backing queue module counterparts. See the `passthrough`
+%% record and passthrough{1,2,3} macros.
+%%
+%% Delivery to consumers happens by first "running" the queue with
+%% the highest priority until there are no more messages to deliver,
+%% then the next one, and so on. This offers good prioritisation
+%% but may result in lower priority messages not being delivered
+%% when there's a high ingress rate of messages with higher priority.
+
+enable() ->
+ {ok, RealBQ} = application:get_env(rabbit, backing_queue_module),
+ case RealBQ of
+ ?MODULE -> ok;
+ _ -> rabbit_log:info("Priority queues enabled, real BQ is ~s~n",
+ [RealBQ]),
+ application:set_env(
+ rabbitmq_priority_queue, backing_queue_module, RealBQ),
+ application:set_env(rabbit, backing_queue_module, ?MODULE)
+ end.
+
+%%----------------------------------------------------------------------------
+
+start(QNames) ->
+ BQ = bq(),
+ %% TODO this expand-collapse dance is a bit ridiculous but it's what
+ %% rabbit_amqqueue:recover/0 expects. We could probably simplify
+ %% this if we rejigged recovery a bit.
+ {DupNames, ExpNames} = expand_queues(QNames),
+ case BQ:start(ExpNames) of
+ {ok, ExpRecovery} ->
+ {ok, collapse_recovery(QNames, DupNames, ExpRecovery)};
+ Else ->
+ Else
+ end.
+
+stop() ->
+ BQ = bq(),
+ BQ:stop().
+
+%%----------------------------------------------------------------------------
+
+mutate_name(P, Q = #amqqueue{name = QName = #resource{name = QNameBin}}) ->
+ Q#amqqueue{name = QName#resource{name = mutate_name_bin(P, QNameBin)}}.
+
+mutate_name_bin(P, NameBin) -> <<NameBin/binary, 0, P:8>>.
+
+expand_queues(QNames) ->
+ lists:unzip(
+ lists:append([expand_queue(QName) || QName <- QNames])).
+
+expand_queue(QName = #resource{name = QNameBin}) ->
+ {ok, Q} = rabbit_misc:dirty_read({rabbit_durable_queue, QName}),
+ case priorities(Q) of
+ none -> [{QName, QName}];
+ Ps -> [{QName, QName#resource{name = mutate_name_bin(P, QNameBin)}}
+ || P <- Ps]
+ end.
+
+collapse_recovery(QNames, DupNames, Recovery) ->
+ NameToTerms = lists:foldl(fun({Name, RecTerm}, Dict) ->
+ dict:append(Name, RecTerm, Dict)
+ end, dict:new(), lists:zip(DupNames, Recovery)),
+ [dict:fetch(Name, NameToTerms) || Name <- QNames].
+
+priorities(#amqqueue{arguments = Args}) ->
+ Ints = [long, short, signedint, byte],
+ case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of
+ {Type, Max} -> case lists:member(Type, Ints) of
+ false -> none;
+ true -> lists:reverse(lists:seq(0, Max))
+ end;
+ _ -> none
+ end.
+
+%%----------------------------------------------------------------------------
+
+init(Q, Recover, AsyncCallback) ->
+ BQ = bq(),
+ case priorities(Q) of
+ none -> RealRecover = case Recover of
+ [R] -> R; %% [0]
+ R -> R
+ end,
+ #passthrough{bq = BQ,
+ bqs = BQ:init(Q, RealRecover, AsyncCallback)};
+ Ps -> Init = fun (P, Term) ->
+ BQ:init(
+ mutate_name(P, Q), Term,
+ fun (M, F) -> AsyncCallback(M, {P, F}) end)
+ end,
+ BQSs = case have_recovery_terms(Recover) of
+ false -> [{P, Init(P, Recover)} || P <- Ps];
+ _ -> PsTerms = lists:zip(Ps, Recover),
+ [{P, Init(P, Term)} || {P, Term} <- PsTerms]
+ end,
+ #state{bq = BQ,
+ bqss = BQSs}
+ end.
+%% [0] collapse_recovery has the effect of making a list of recovery
+%% terms in priority order, even for non priority queues. It's easier
+%% to do that and "unwrap" in init/3 than to have collapse_recovery be
+%% aware of non-priority queues.
+
+have_recovery_terms(new) -> false;
+have_recovery_terms(non_clean_shutdown) -> false;
+have_recovery_terms(_) -> true.
+
+terminate(Reason, State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:terminate(Reason, BQSN) end, State);
+terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(terminate(Reason, BQS)).
+
+delete_and_terminate(Reason, State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) ->
+ BQ:delete_and_terminate(Reason, BQSN)
+ end, State);
+delete_and_terminate(Reason, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(delete_and_terminate(Reason, BQS)).
+
+delete_crashed(Q) ->
+ BQ = bq(),
+ case priorities(Q) of
+ none -> BQ:delete_crashed(Q);
+ Ps -> [BQ:delete_crashed(mutate_name(P, Q)) || P <- Ps]
+ end.
+
+purge(State = #state{bq = BQ}) ->
+ fold_add2(fun (_P, BQSN) -> BQ:purge(BQSN) end, State);
+purge(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(purge(BQS)).
+
+purge_acks(State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:purge_acks(BQSN) end, State);
+purge_acks(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(purge_acks(BQS)).
+
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #state{bq = BQ}) ->
+ pick1(fun (_P, BQSN) ->
+ BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQSN)
+ end, Msg, State);
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State = #state{bq = BQ}) ->
+ pick2(fun (P, BQSN) ->
+ {AckTag, BQSN1} = BQ:publish_delivered(
+ Msg, MsgProps, ChPid, Flow, BQSN),
+ {{P, AckTag}, BQSN1}
+ end, Msg, State);
+publish_delivered(Msg, MsgProps, ChPid, Flow,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)).
+
+%% TODO this is a hack. The BQ api does not give us enough information
+%% here - if we had the Msg we could look at its priority and forward
+%% to the appropriate sub-BQ. But we don't so we are stuck.
+%%
+%% But fortunately VQ ignores discard/4, so we can too, *assuming we
+%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
+%% (if in use) so we don't break that either, just some hypothetical
+%% alternate BQ implementation.
+discard(_MsgId, _ChPid, _Flow, State = #state{}) ->
+ State;
+ %% We should have something a bit like this here:
+ %% pick1(fun (_P, BQSN) ->
+ %% BQ:discard(MsgId, ChPid, Flow, BQSN)
+ %% end, Msg, State);
+discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(discard(MsgId, ChPid, Flow, BQS)).
+
+drain_confirmed(State = #state{bq = BQ}) ->
+ fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);
+drain_confirmed(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(drain_confirmed(BQS)).
+
+dropwhile(Pred, State = #state{bq = BQ}) ->
+ find2(fun (_P, BQSN) -> BQ:dropwhile(Pred, BQSN) end, undefined, State);
+dropwhile(Pred, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(dropwhile(Pred, BQS)).
+
+%% TODO this is a bit nasty. In the one place where fetchwhile/4 is
+%% actually used the accumulator is a list of acktags, which of course
+%% we need to mutate - so we do that although we are encoding an
+%% assumption here.
+fetchwhile(Pred, Fun, Acc, State = #state{bq = BQ}) ->
+ findfold3(
+ fun (P, BQSN, AccN) ->
+ {Res, AccN1, BQSN1} = BQ:fetchwhile(Pred, Fun, AccN, BQSN),
+ {Res, priority_on_acktags(P, AccN1), BQSN1}
+ end, Acc, undefined, State);
+fetchwhile(Pred, Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough3(fetchwhile(Pred, Fun, Acc, BQS)).
+
+fetch(AckRequired, State = #state{bq = BQ}) ->
+ find2(
+ fun (P, BQSN) ->
+ case BQ:fetch(AckRequired, BQSN) of
+ {empty, BQSN1} -> {empty, BQSN1};
+ {{Msg, Del, ATag}, BQSN1} -> {{Msg, Del, {P, ATag}}, BQSN1}
+ end
+ end, empty, State);
+fetch(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(fetch(AckRequired, BQS)).
+
+drop(AckRequired, State = #state{bq = BQ}) ->
+ find2(fun (P, BQSN) ->
+ case BQ:drop(AckRequired, BQSN) of
+ {empty, BQSN1} -> {empty, BQSN1};
+ {{MsgId, AckTag}, BQSN1} -> {{MsgId, {P, AckTag}}, BQSN1}
+ end
+ end, empty, State);
+drop(AckRequired, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(drop(AckRequired, BQS)).
+
+ack(AckTags, State = #state{bq = BQ}) ->
+ fold_by_acktags2(fun (AckTagsN, BQSN) ->
+ BQ:ack(AckTagsN, BQSN)
+ end, AckTags, State);
+ack(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(ack(AckTags, BQS)).
+
+requeue(AckTags, State = #state{bq = BQ}) ->
+ fold_by_acktags2(fun (AckTagsN, BQSN) ->
+ BQ:requeue(AckTagsN, BQSN)
+ end, AckTags, State);
+requeue(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(requeue(AckTags, BQS)).
+
+%% Similar problem to fetchwhile/4
+ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
+ AckTagsByPriority = partition_acktags(AckTags),
+ fold2(
+ fun (P, BQSN, AccN) ->
+ case orddict:find(P, AckTagsByPriority) of
+ {ok, ATagsN} -> {AccN1, BQSN1} =
+ BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
+ {priority_on_acktags(P, AccN1), BQSN1};
+ error -> {AccN, BQSN}
+ end
+ end, Acc, State);
+ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) ->
+ ?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)).
+
+fold(Fun, Acc, State = #state{bq = BQ}) ->
+ fold2(fun (_P, BQSN, AccN) -> BQ:fold(Fun, AccN, BQSN) end, Acc, State);
+fold(Fun, Acc, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(fold(Fun, Acc, BQS)).
+
+len(#state{bq = BQ, bqss = BQSs}) ->
+ add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs);
+len(#passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:len(BQS).
+
+is_empty(#state{bq = BQ, bqss = BQSs}) ->
+ all0(fun (_P, BQSN) -> BQ:is_empty(BQSN) end, BQSs);
+is_empty(#passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:is_empty(BQS).
+
+depth(#state{bq = BQ, bqss = BQSs}) ->
+ add0(fun (_P, BQSN) -> BQ:depth(BQSN) end, BQSs);
+depth(#passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:depth(BQS).
+
+set_ram_duration_target(DurationTarget, State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) ->
+ BQ:set_ram_duration_target(DurationTarget, BQSN)
+ end, State);
+set_ram_duration_target(DurationTarget,
+ State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(set_ram_duration_target(DurationTarget, BQS)).
+
+ram_duration(State = #state{bq = BQ}) ->
+ fold_add2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State);
+ram_duration(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(ram_duration(BQS)).
+
+needs_timeout(#state{bq = BQ, bqss = BQSs}) ->
+ fold0(fun (_P, _BQSN, timed) -> timed;
+ (_P, BQSN, idle) -> case BQ:needs_timeout(BQSN) of
+ timed -> timed;
+ _ -> idle
+ end;
+ (_P, BQSN, false) -> BQ:needs_timeout(BQSN)
+ end, false, BQSs);
+needs_timeout(#passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:needs_timeout(BQS).
+
+timeout(State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:timeout(BQSN) end, State);
+timeout(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(timeout(BQS)).
+
+handle_pre_hibernate(State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) ->
+ BQ:handle_pre_hibernate(BQSN)
+ end, State);
+handle_pre_hibernate(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(handle_pre_hibernate(BQS)).
+
+resume(State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:resume(BQSN) end, State);
+resume(State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(resume(BQS)).
+
+msg_rates(#state{bq = BQ, bqss = BQSs}) ->
+ fold0(fun(_P, BQSN, {InN, OutN}) ->
+ {In, Out} = BQ:msg_rates(BQSN),
+ {InN + In, OutN + Out}
+ end, {0.0, 0.0}, BQSs);
+msg_rates(#passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:msg_rates(BQS).
+
+info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) ->
+ fold0(fun (P, BQSN, Acc) ->
+ combine_status(P, BQ:info(backing_queue_status, BQSN), Acc)
+ end, nothing, BQSs);
+info(Item, #state{bq = BQ, bqss = BQSs}) ->
+ fold0(fun (_P, BQSN, Acc) ->
+ Acc + BQ:info(Item, BQSN)
+ end, 0, BQSs);
+info(Item, #passthrough{bq = BQ, bqs = BQS}) ->
+ BQ:info(Item, BQS).
+
+invoke(Mod, {P, Fun}, State = #state{bq = BQ}) ->
+ pick1(fun (_P, BQSN) -> BQ:invoke(Mod, Fun, BQSN) end, P, State);
+invoke(Mod, Fun, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(invoke(Mod, Fun, BQS)).
+
+is_duplicate(Msg, State = #state{bq = BQ}) ->
+ pick2(fun (_P, BQSN) -> BQ:is_duplicate(Msg, BQSN) end, Msg, State);
+is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough2(is_duplicate(Msg, BQS)).
+
+%%----------------------------------------------------------------------------
+
+bq() ->
+ {ok, RealBQ} = application:get_env(
+ rabbitmq_priority_queue, backing_queue_module),
+ RealBQ.
+
+%% Note on suffixes: Many utility functions here have suffixes telling
+%% you the arity of the return type of the BQ function they are
+%% designed to work with.
+%%
+%% 0 - BQ function returns a value and does not modify state
+%% 1 - BQ function just returns a new state
+%% 2 - BQ function returns a 2-tuple of {Result, NewState}
+%% 3 - BQ function returns a 3-tuple of {Result1, Result2, NewState}
+
+%% Fold over results
+fold0(Fun, Acc, [{P, BQSN} | Rest]) -> fold0(Fun, Fun(P, BQSN, Acc), Rest);
+fold0(_Fun, Acc, []) -> Acc.
+
+%% Do all BQs match?
+all0(Pred, BQSs) -> fold0(fun (_P, _BQSN, false) -> false;
+ (P, BQSN, true) -> Pred(P, BQSN)
+ end, true, BQSs).
+
+%% Sum results
+add0(Fun, BQSs) -> fold0(fun (P, BQSN, Acc) -> Acc + Fun(P, BQSN) end, 0, BQSs).
+
+%% Apply for all states
+foreach1(Fun, State = #state{bqss = BQSs}) ->
+ a(State#state{bqss = foreach1(Fun, BQSs, [])}).
+foreach1(Fun, [{P, BQSN} | Rest], BQSAcc) ->
+ BQSN1 = Fun(P, BQSN),
+ foreach1(Fun, Rest, [{P, BQSN1} | BQSAcc]);
+foreach1(_Fun, [], BQSAcc) ->
+ lists:reverse(BQSAcc).
+
+%% For a given thing, just go to its BQ
+pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
+ {P, BQSN} = priority(Prioritisable, BQSs),
+ a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}).
+
+%% Fold over results
+fold2(Fun, Acc, State = #state{bqss = BQSs}) ->
+ {Res, BQSs1} = fold2(Fun, Acc, BQSs, []),
+ {Res, a(State#state{bqss = BQSs1})}.
+fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) ->
+ {Acc1, BQSN1} = Fun(P, BQSN, Acc),
+ fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]);
+fold2(_Fun, Acc, [], BQSAcc) ->
+ {Acc, lists:reverse(BQSAcc)}.
+
+%% Fold over results assuming results are lists and we want to append them
+fold_append2(Fun, State) ->
+ fold2(fun (P, BQSN, Acc) ->
+ {Res, BQSN1} = Fun(P, BQSN),
+ {Res ++ Acc, BQSN1}
+ end, [], State).
+
+%% Fold over results assuming results are numbers and we want to sum them
+fold_add2(Fun, State) ->
+ fold2(fun (P, BQSN, Acc) ->
+ {Res, BQSN1} = Fun(P, BQSN),
+ {add_maybe_infinity(Res, Acc), BQSN1}
+ end, 0, State).
+
+%% Fold over results assuming results are lists and we want to append
+%% them, and also that we have some AckTags we want to pass in to each
+%% invocation.
+fold_by_acktags2(Fun, AckTags, State) ->
+ AckTagsByPriority = partition_acktags(AckTags),
+ fold_append2(fun (P, BQSN) ->
+ case orddict:find(P, AckTagsByPriority) of
+ {ok, AckTagsN} -> Fun(AckTagsN, BQSN);
+ error -> {[], BQSN}
+ end
+ end, State).
+
+%% For a given thing, just go to its BQ
+pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) ->
+ {P, BQSN} = priority(Prioritisable, BQSs),
+ {Res, BQSN1} = Fun(P, BQSN),
+ {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}.
+
+%% Run through BQs in priority order until one does not return
+%% {NotFound, NewState} or we have gone through them all.
+find2(Fun, NotFound, State = #state{bqss = BQSs}) ->
+ {Res, BQSs1} = find2(Fun, NotFound, BQSs, []),
+ {Res, a(State#state{bqss = BQSs1})}.
+find2(Fun, NotFound, [{P, BQSN} | Rest], BQSAcc) ->
+ case Fun(P, BQSN) of
+ {NotFound, BQSN1} -> find2(Fun, NotFound, Rest, [{P, BQSN1} | BQSAcc]);
+ {Res, BQSN1} -> {Res, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest}
+ end;
+find2(_Fun, NotFound, [], BQSAcc) ->
+ {NotFound, lists:reverse(BQSAcc)}.
+
+%% Run through BQs in priority order like find2 but also folding as we go.
+findfold3(Fun, Acc, NotFound, State = #state{bqss = BQSs}) ->
+ {Res, Acc1, BQSs1} = findfold3(Fun, Acc, NotFound, BQSs, []),
+ {Res, Acc1, a(State#state{bqss = BQSs1})}.
+findfold3(Fun, Acc, NotFound, [{P, BQSN} | Rest], BQSAcc) ->
+ case Fun(P, BQSN, Acc) of
+ {NotFound, Acc1, BQSN1} ->
+ findfold3(Fun, Acc1, NotFound, Rest, [{P, BQSN1} | BQSAcc]);
+ {Res, Acc1, BQSN1} ->
+ {Res, Acc1, lists:reverse([{P, BQSN1} | BQSAcc]) ++ Rest}
+ end;
+findfold3(_Fun, Acc, NotFound, [], BQSAcc) ->
+ {NotFound, Acc, lists:reverse(BQSAcc)}.
+
+bq_fetch(P, []) -> exit({not_found, P});
+bq_fetch(P, [{P, BQSN} | _]) -> BQSN;
+bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T).
+
+bq_store(P, BQS, BQSs) ->
+ [{PN, case PN of
+ P -> BQS;
+ _ -> BQSN
+ end} || {PN, BQSN} <- BQSs].
+
+%%
+a(State = #state{bqss = BQSs}) ->
+ Ps = [P || {P, _} <- BQSs],
+ case lists:reverse(lists:usort(Ps)) of
+ Ps -> State;
+ _ -> exit({bad_order, Ps})
+ end.
+
+%%----------------------------------------------------------------------------
+
+priority(P, BQSs) when is_integer(P) ->
+ {P, bq_fetch(P, BQSs)};
+priority(_Msg, [{P, BQSN}]) ->
+ {P, BQSN};
+priority(Msg = #basic_message{content = #content{properties = Props}},
+ [{P, BQSN} | Rest]) ->
+ #'P_basic'{priority = Priority0} = Props,
+ Priority = case Priority0 of
+ undefined -> 0;
+ _ when is_integer(Priority0) -> Priority0
+ end,
+ case Priority >= P of
+ true -> {P, BQSN};
+ false -> priority(Msg, Rest)
+ end.
+
+add_maybe_infinity(infinity, _) -> infinity;
+add_maybe_infinity(_, infinity) -> infinity;
+add_maybe_infinity(A, B) -> A + B.
+
+partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()).
+
+partition_acktags([], Partitioned) ->
+ orddict:map(fun (_P, RevAckTags) ->
+ lists:reverse(RevAckTags)
+ end, Partitioned);
+partition_acktags([{P, AckTag} | Rest], Partitioned) ->
+ partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)).
+
+priority_on_acktags(P, AckTags) ->
+ [case Tag of
+ _ when is_integer(Tag) -> {P, Tag};
+ _ -> Tag
+ end || Tag <- AckTags].
+
+combine_status(P, New, nothing) ->
+ [{priority_lengths, [{P, proplists:get_value(len, New)}]} | New];
+combine_status(P, New, Old) ->
+ Combined = [{K, cse(V, proplists:get_value(K, Old))} || {K, V} <- New],
+ Lens = [{P, proplists:get_value(len, New)} |
+ proplists:get_value(priority_lengths, Old)],
+ [{priority_lengths, Lens} | Combined].
+
+cse(infinity, _) -> infinity;
+cse(_, infinity) -> infinity;
+cse(A, B) when is_number(A) -> A + B;
+cse({delta, _, _, _}, _) -> {delta, todo, todo, todo};
+cse(A, B) -> exit({A, B}).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 0a2c88d441..24dcd23cc8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,19 +16,27 @@
-module(rabbit_queue_index).
--export([erase/1, init/2, recover/5,
+-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
+ publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
--export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0]).
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
%% The queue index is responsible for recording the order of messages
-%% within a queue on disk.
+%% within a queue on disk. As such it contains records of messages
+%% being published, delivered and acknowledged. The publish record
+%% includes the sequence ID, message ID and a small quantity of
+%% metadata about the message; the delivery and acknowledgement
+%% records just contain the sequence ID. A publish record may also
+%% contain the complete message if provided to publish/5; this allows
+%% the message store to be avoided altogether for small messages. In
+%% either case the publish record is stored in memory in the same
+%% serialised format it will take on disk.
%%
%% Because of the fact that the queue can decide at any point to send
%% a queue entry to disk, you can not rely on publishes appearing in
@@ -36,7 +44,7 @@
%% then delivered, then ack'd.
%%
%% In order to be able to clean up ack'd messages, we write to segment
-%% files. These files have a fixed maximum size: ?SEGMENT_ENTRY_COUNT
+%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT
%% publishes, delivers and acknowledgements. They are numbered, and so
%% it is known that the 0th segment contains messages 0 ->
%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages
@@ -85,7 +93,7 @@
%% and seeding the message store on start up.
%%
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{MsgId, MsgProps, IsPersistent}),
+%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}),
%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
%% necessary for most operations. However, for startup, and to ensure
%% the safe and correct combination of journal entries with entries
@@ -128,8 +136,8 @@
-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
%% publish record is binary 1 followed by a bit for is_persistent,
-%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits
-%% of md5sum msg id
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
-define(PUB_PREFIX, 1).
-define(PUB_PREFIX_BITS, 1).
@@ -140,32 +148,37 @@
-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+%% This is the size of the message body content, for stats
-define(SIZE_BYTES, 4).
-define(SIZE_BITS, (?SIZE_BYTES * 8)).
-%% 16 bytes for md5sum + 8 for expiry + 4 for size
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(EMBEDDED_SIZE_BYTES, 4).
+-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry
-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
-%% + 2 for seq, bits and prefix
--define(PUB_RECORD_BYTES, (?PUB_RECORD_BODY_BYTES + 2)).
+%% + 4 for size
+-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)).
-%% 1 publish, 1 deliver, 1 ack per msg
--define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT *
- (?PUB_RECORD_BYTES + (2 * ?REL_SEQ_ONLY_RECORD_BYTES))).
+%% + 2 for seq, bits and prefix
+-define(PUB_RECORD_PREFIX_BYTES, 2).
%% ---- misc ----
--define(PUB, {_, _, _}). %% {MsgId, MsgProps, IsPersistent}
+-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin}
-define(READ_MODE, [binary, raw, read]).
--define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
--record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unconfirmed }).
+-record(qistate, {dir, segments, journal_handle, dirty_count,
+ max_journal_entries, on_sync, on_sync_msg,
+ unconfirmed, unconfirmed_msg}).
--record(segment, { num, path, journal_entries, unacked }).
+-record(segment, {num, path, journal_entries, unacked}).
-include("rabbit.hrl").
@@ -174,6 +187,7 @@
-rabbit_upgrade({add_queue_ttl, local, []}).
-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
+-rabbit_upgrade({store_msg, local, [store_msg_size]}).
-ifdef(use_specs).
@@ -193,7 +207,9 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unconfirmed :: gb_sets:set()
+ on_sync_msg :: on_sync_fun(),
+ unconfirmed :: gb_sets:set(),
+ unconfirmed_msg :: gb_sets:set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -201,16 +217,18 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
--spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
--spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- contains_predicate(), on_sync_fun()) ->
+-spec(init/3 :: (rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate()).
+-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
{'undefined' | non_neg_integer(),
'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
--spec(publish/5 :: (rabbit_types:msg_id(), seq_id(),
- rabbit_types:message_properties(), boolean(), qistate())
- -> qistate()).
+-spec(publish/6 :: (rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(), boolean(),
+ non_neg_integer(), qistate()) -> qistate()).
-spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(ack/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(sync/1 :: (qistate()) -> qistate()).
@@ -241,14 +259,17 @@ erase(Name) ->
false -> ok
end.
-init(Name, OnSyncFun) ->
+init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = rabbit_file:is_file(Dir), %% is_file == is file or dir
- State #qistate { on_sync = OnSyncFun }.
+ State#qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun,
+ OnSyncFun, OnSyncMsgFun) ->
State = blank_state(Name),
- State1 = State #qistate { on_sync = OnSyncFun },
+ State1 = State #qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun},
CleanShutdown = Terms /= non_clean_shutdown,
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
@@ -267,26 +288,35 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
-publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unconfirmed = Unconfirmed })
- when is_binary(MsgId) ->
+publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- case MsgProps#message_properties.needs_confirming of
- true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
- State #qistate { unconfirmed = Unconfirmed1 };
- false -> State
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
end),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS,
- SeqId:?SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)]),
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
maybe_flush_journal(
- add_to_journal(SeqId, {MsgId, MsgProps, IsPersistent}, State1)).
+ JournalSizeHint,
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -302,10 +332,12 @@ sync(State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
notify_sync(State).
-needs_sync(#qistate { journal_handle = undefined }) ->
+needs_sync(#qistate{journal_handle = undefined}) ->
false;
-needs_sync(#qistate { journal_handle = JournalHdl, unconfirmed = UC }) ->
- case gb_sets:is_empty(UC) of
+needs_sync(#qistate{journal_handle = JournalHdl,
+ unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
true -> case file_handle_cache:needs_sync(JournalHdl) of
true -> other;
false -> false
@@ -409,7 +441,9 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unconfirmed = gb_sets:new() }.
+ on_sync_msg = fun (_) -> ok end,
+ unconfirmed = gb_sets:new(),
+ unconfirmed_msg = gb_sets:new() }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -479,9 +513,10 @@ recover_segment(ContainsCheckFun, CleanShutdown,
{SegEntries1, UnackedCountDelta} =
segment_plus_journal(SegEntries, JEntries),
array:sparse_foldl(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, Del, no_ack},
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack},
{SegmentAndDirtyCount, Bytes}) ->
- {recover_message(ContainsCheckFun(MsgId), CleanShutdown,
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
Del, RelSeq, SegmentAndDirtyCount),
Bytes + case IsPersistent of
true -> MsgProps#message_properties.size;
@@ -541,7 +576,8 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
@@ -555,9 +591,9 @@ scan_segments(Fun, Acc, State) ->
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
@@ -568,24 +604,35 @@ scan_segments(Fun, Acc, State) ->
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
-create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size }) ->
- [MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ ExpiryBin = expiry_to_binary(Expiry),
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>};
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin}
+ end.
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
- Size:?SIZE_BITS>>) ->
+ Size:?SIZE_BITS>>, MsgBin) ->
%% work around for binary data fragmentation. See
%% rabbit_msg_file:read_next/2
<<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
- Exp = case Expiry of
- ?NO_EXPIRY -> undefined;
- X -> X
- end,
- {MsgId, #message_properties { expiry = Exp,
- size = Size }}.
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case MsgBin of
+ <<>> -> {MsgId, Props};
+ _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
+ {Msg, Props}
+ end.
%%----------------------------------------------------------------------------
%% journal manipulation
@@ -628,11 +675,14 @@ add_to_journal(RelSeq, Action, JEntries) ->
array:reset(RelSeq, JEntries)
end.
-maybe_flush_journal(State = #qistate { dirty_count = DCount,
- max_journal_entries = MaxJournal })
- when DCount > MaxJournal ->
- flush_journal(State);
maybe_flush_journal(State) ->
+ maybe_flush_journal(infinity, State).
+
+maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount,
+ max_journal_entries = MaxJournal })
+ when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) ->
+ flush_journal(State);
+maybe_flush_journal(_Hint, State) ->
State.
flush_journal(State = #qistate { segments = Segments }) ->
@@ -656,9 +706,13 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ _ -> Seg = array:sparse_foldr(
+ fun entry_to_segment/3, [], JEntries),
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
- array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
+ file_handle_cache:append(Hdl, Seg),
ok = file_handle_cache:close(Hdl),
Segment #segment { journal_entries = array_new() }
end.
@@ -677,10 +731,13 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
%% if you call it more than once on the same state. Assumes the counts
%% are 0 to start with.
load_journal(State = #qistate { dir = Dir }) ->
- case rabbit_file:is_file(filename:join(Dir, ?JOURNAL_FILENAME)) of
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ case rabbit_file:is_file(Path) of
true -> {JournalHdl, State1} = get_journal_handle(State),
+ Size = rabbit_file:file_size(Path),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- load_journal_entries(State1);
+ {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
+ parse_journal_entries(JournalBin, State1);
false -> State
end.
@@ -704,41 +761,37 @@ recover_journal(State) ->
end, Segments),
State1 #qistate { segments = Segments1 }.
-load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
- case file_handle_cache:read(Hdl, ?SEQ_BYTES) of
- {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} ->
- case Prefix of
- ?DEL_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, del, State));
- ?ACK_JPREFIX ->
- load_journal_entries(add_to_journal(SeqId, ack, State));
- _ ->
- case file_handle_cache:read(Hdl, ?PUB_RECORD_BODY_BYTES) of
- %% Journal entry composed only of zeroes was probably
- %% produced during a dirty shutdown so stop reading
- {ok, <<0:?PUB_RECORD_BODY_BYTES/unit:8>>} ->
- State;
- {ok, <<Bin:?PUB_RECORD_BODY_BYTES/binary>>} ->
- {MsgId, MsgProps} = parse_pub_record_body(Bin),
- IsPersistent = case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end,
- load_journal_entries(
- add_to_journal(
- SeqId, {MsgId, MsgProps, IsPersistent}, State));
- _ErrOrEoF -> %% err, we've lost at least a publish
- State
- end
- end;
- _ErrOrEoF -> State
- end.
+parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
+
+parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
+parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
+ 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ State;
+parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
+ Rest/binary>>, State) ->
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ parse_journal_entries(
+ Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
+parse_journal_entries(_ErrOrEoF, State) ->
+ State.
deliver_or_ack(_Kind, [], State) ->
State;
deliver_or_ack(Kind, SeqIds, State) ->
JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end,
{JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
ok = file_handle_cache:append(
JournalHdl,
[<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
@@ -746,11 +799,19 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
- case gb_sets:is_empty(UC) of
- true -> State;
- false -> OnSyncFun(UC),
- State #qistate { unconfirmed = gb_sets:new() }
+notify_sync(State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}) ->
+ State1 = case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State#qistate{unconfirmed = gb_sets:new()}
+ end,
+ case gb_sets:is_empty(UCM) of
+ true -> State1;
+ false -> OnSyncMsgFun(UCM),
+ State1#qistate{unconfirmed_msg = gb_sets:new()}
end.
%%----------------------------------------------------------------------------
@@ -823,42 +884,42 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-write_entry_to_segment(_RelSeq, {?PUB, del, ack}, Hdl) ->
- Hdl;
-write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
- ok = case Pub of
- no_pub ->
- ok;
- {MsgId, MsgProps, IsPersistent} ->
- file_handle_cache:append(
- Hdl, [<<?PUB_PREFIX:?PUB_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>,
- create_pub_record_body(MsgId, MsgProps)])
- end,
- ok = case {Del, Ack} of
- {no_del, no_ack} ->
- ok;
- _ ->
- Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
- file_handle_cache:append(
- Hdl, case {Del, Ack} of
- {del, ack} -> [Binary, Binary];
- _ -> Binary
- end)
- end,
- Hdl.
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
+ Buf;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+ %% NB: we are assembling the segment in reverse order here, so
+ %% del/ack comes first.
+ Buf1 = case {Del, Ack} of
+ {no_del, no_ack} ->
+ Buf;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Buf];
+ _ -> [Binary | Buf]
+ end
+ end,
+ case Pub of
+ no_pub ->
+ Buf1;
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
+ end.
read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
{Messages, Segments}, Dir) ->
Segment = segment_find_or_new(Seg, Dir, Segments),
{segment_entries_foldr(
- fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, IsDelivered, no_ack}, Acc)
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ Acc)
when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
(Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
- [ {MsgId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
- IsPersistent, IsDelivered == del} | Acc ];
+ [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc];
(_RelSeq, _Value, Acc) ->
Acc
end, Messages, Segment),
@@ -868,7 +929,11 @@ segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
{SegEntries, _UnackedCount} = load_segment(false, Segment),
{SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
- array:sparse_foldr(Fun, Init, SegEntries1).
+ array:sparse_foldr(
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) ->
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
+ end, Init, SegEntries1).
%% Loading segments
%%
@@ -877,44 +942,48 @@ load_segment(KeepAcked, #segment { path = Path }) ->
Empty = {array_new(), 0},
case rabbit_file:is_file(Path) of
false -> Empty;
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
+ true -> Size = rabbit_file:file_size(Path),
+ file_handle_cache_stats:update(queue_index_read),
+ {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
- Res = case file_handle_cache:read(Hdl, ?SEGMENT_TOTAL_SIZE) of
- {ok, SegData} -> load_segment_entries(
- KeepAcked, SegData, Empty);
- eof -> Empty
- end,
+ {ok, SegBin} = file_handle_cache:read(Hdl, Size),
ok = file_handle_cache:close(Hdl),
+ Res = parse_segment_entries(SegBin, KeepAcked, Empty),
Res
end.
-load_segment_entries(KeepAcked,
- <<?PUB_PREFIX:?PUB_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
- PubRecordBody:?PUB_RECORD_BODY_BYTES/binary,
- SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {MsgId, MsgProps} = parse_pub_record_body(PubRecordBody),
- Obj = {{MsgId, MsgProps, 1 == IsPersistentNum}, no_del, no_ack},
+parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
+ KeepAcked, Acc) ->
+ parse_segment_publish_entry(
+ Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
+parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
+ parse_segment_entries(
+ Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+parse_segment_entries(<<>>, _KeepAcked, Acc) ->
+ Acc.
+
+parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS,
+ MsgBin:MsgSize/binary, Rest/binary>>,
+ IsPersistent, RelSeq, KeepAcked,
+ {SegEntries, Unacked}) ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
SegEntries1 = array:set(RelSeq, Obj, SegEntries),
- load_segment_entries(KeepAcked, SegData, {SegEntries1, UnackedCount + 1});
-load_segment_entries(KeepAcked,
- <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS, SegData/binary>>,
- {SegEntries, UnackedCount}) ->
- {UnackedCountDelta, SegEntries1} =
- case array:get(RelSeq, SegEntries) of
- {Pub, no_del, no_ack} ->
- { 0, array:set(RelSeq, {Pub, del, no_ack}, SegEntries)};
- {Pub, del, no_ack} when KeepAcked ->
- {-1, array:set(RelSeq, {Pub, del, ack}, SegEntries)};
- {_Pub, del, no_ack} ->
- {-1, array:reset(RelSeq, SegEntries)}
- end,
- load_segment_entries(KeepAcked, SegData,
- {SegEntries1, UnackedCount + UnackedCountDelta});
-load_segment_entries(_KeepAcked, _SegData, Res) ->
- Res.
+ parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
+parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
+ parse_segment_entries(Rest, KeepAcked, Acc).
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -1121,6 +1190,40 @@ store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
store_msg_size_segment(_) ->
stop.
+store_msg() ->
+ foreach_queue_index({fun store_msg_journal/1,
+ fun store_msg_segment/1}).
+
+store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_journal(_) ->
+ stop.
+
+store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_segment(_) ->
+ stop.
+
+
+
%%----------------------------------------------------------------------------
@@ -1157,7 +1260,7 @@ transform_file(Path, Fun) when is_function(Fun)->
[{write_buffer, infinity}]),
{ok, PathHdl} = file_handle_cache:open(
- Path, [{read_ahead, Size} | ?READ_MODE], []),
+ Path, ?READ_MODE, [{read_buffer, Size}]),
{ok, Content} = file_handle_cache:read(PathHdl, Size),
ok = file_handle_cache:close(PathHdl),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index eb50eeb163..e3247439ef 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,6 +58,11 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, vhost, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
+
-define(IS_RUNNING(State),
(State#v1.connection_state =:= running orelse
State#v1.connection_state =:= blocking orelse
@@ -214,7 +219,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
rabbit_net:fast_close(Sock),
exit(normal)
end,
- log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(HandshakeTimeout, self(), handshake_timeout),
@@ -260,8 +264,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
- connection_closed_abruptly -> warning;
- _ -> error
+ connection_closed_with_no_data_received -> debug;
+ connection_closed_abruptly -> warning;
+ _ -> error
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), Name, Ex])
after
@@ -313,8 +318,28 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
binlist_split(Len, [H|T], Acc) ->
binlist_split(Len - size(H), T, [H|Acc]).
-mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
- case rabbit_net:recv(Sock) of
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock,
+ connection_state = CS,
+ connection = #connection{
+ name = ConnName}}) ->
+ Recv = rabbit_net:recv(Sock),
+ case CS of
+ pre_init when Buf =:= [] ->
+ %% We only log incoming connections when either the
+ %% first byte was received or there was an error (eg. a
+ %% timeout).
+ %%
+ %% The goal is to not log TCP healthchecks (a connection
+ %% with no data received) unless specified otherwise.
+ log(case Recv of
+ closed -> debug;
+ _ -> info
+ end, "accepting AMQP connection ~p (~s)~n",
+ [self(), ConnName]);
+ _ ->
+ ok
+ end,
+ case Recv of
{data, Data} ->
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
@@ -334,10 +359,18 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
end
end.
-stop(closed, State) -> maybe_emit_stats(State),
- throw(connection_closed_abruptly);
-stop(Reason, State) -> maybe_emit_stats(State),
- throw({inet_error, Reason}).
+stop(closed, #v1{connection_state = pre_init} = State) ->
+ %% The connection was closed before any packet was received. It's
+ %% probably a load-balancer healthcheck: don't consider this a
+ %% failure.
+ maybe_emit_stats(State),
+ throw(connection_closed_with_no_data_received);
+stop(closed, State) ->
+ maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+stop(Reason, State) ->
+ maybe_emit_stats(State),
+ throw({inet_error, Reason}).
handle_other({conserve_resources, Source, Conserve},
State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) ->
@@ -944,7 +977,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
helper_sup = SupPid,
sock = Sock,
throttle = Throttle}) ->
- ok = rabbit_access_control:check_vhost_access(User, VHostPath),
+ ok = rabbit_access_control:check_vhost_access(User, VHostPath, Sock),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
@@ -1046,9 +1079,12 @@ auth_phase(Response,
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
- {refused, Msg, Args} ->
- auth_fail(Msg, Args, Name, State);
+ {refused, Username, Msg, Args} ->
+ auth_fail(Username, Msg, Args, Name, State);
{protocol_error, Msg, Args} ->
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ State),
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
@@ -1057,9 +1093,12 @@ auth_phase(Response,
auth_state = AuthState1}};
{ok, User = #user{username = Username}} ->
case rabbit_access_control:check_user_loopback(Username, Sock) of
- ok -> ok;
- not_allowed -> auth_fail("user '~s' can only connect via "
- "localhost", [Username], Name, State)
+ ok ->
+ notify_auth_result(Username, user_authentication_success,
+ [], State);
+ not_allowed ->
+ auth_fail(Username, "user '~s' can only connect via "
+ "localhost", [Username], Name, State)
end,
Tune = #'connection.tune'{frame_max = get_env(frame_max),
channel_max = get_env(channel_max),
@@ -1071,11 +1110,15 @@ auth_phase(Response,
end.
-ifdef(use_specs).
--spec(auth_fail/4 :: (string(), [any()], binary(), #v1{}) -> no_return()).
+-spec(auth_fail/5 ::
+ (rabbit_types:username() | none, string(), [any()], binary(), #v1{}) ->
+ no_return()).
-endif.
-auth_fail(Msg, Args, AuthName,
+auth_fail(Username, Msg, Args, AuthName,
State = #v1{connection = #connection{protocol = Protocol,
capabilities = Capabilities}}) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], State),
AmqpError = rabbit_misc:amqp_error(
access_refused, "~s login refused: ~s",
[AuthName, io_lib:format(Msg, Args)], none),
@@ -1094,6 +1137,16 @@ auth_fail(Msg, Args, AuthName,
end,
rabbit_misc:protocol_error(AmqpError).
+notify_auth_result(Username, AuthResult, ExtraProps, State) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, State)};
+ _ -> {Item, i(Item, State)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index dbc2856d0c..9292068cea 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,7 +16,7 @@
-module(rabbit_trace).
--export([init/1, enabled/1, tap_in/5, tap_out/5, start/1, stop/1]).
+-export([init/1, enabled/1, tap_in/6, tap_out/5, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -32,8 +32,8 @@
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
-spec(enabled/1 :: (rabbit_types:vhost()) -> boolean()).
--spec(tap_in/5 :: (rabbit_types:basic_message(), binary(),
- rabbit_channel:channel_number(),
+-spec(tap_in/6 :: (rabbit_types:basic_message(), [rabbit_amqqueue:name()],
+ binary(), rabbit_channel:channel_number(),
rabbit_types:username(), state()) -> 'ok').
-spec(tap_out/5 :: (rabbit_amqqueue:qmsg(), binary(),
rabbit_channel:channel_number(),
@@ -58,15 +58,17 @@ enabled(VHost) ->
{ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
lists:member(VHost, VHosts).
-tap_in(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
+tap_in(_Msg, _QNames, _ConnName, _ChannelNum, _Username, none) -> ok;
tap_in(Msg = #basic_message{exchange_name = #resource{name = XName,
virtual_host = VHost}},
- ConnName, ChannelNum, Username, TraceX) ->
+ QNames, ConnName, ChannelNum, Username, TraceX) ->
trace(TraceX, Msg, <<"publish">>, XName,
- [{<<"vhost">>, longstr, VHost},
- {<<"connection">>, longstr, ConnName},
- {<<"channel">>, signedint, ChannelNum},
- {<<"user">>, longstr, Username}]).
+ [{<<"vhost">>, longstr, VHost},
+ {<<"connection">>, longstr, ConnName},
+ {<<"channel">>, signedint, ChannelNum},
+ {<<"user">>, longstr, Username},
+ {<<"routed_queues">>, array,
+ [{longstr, QName#resource.name} || QName <- QNames]}]).
tap_out(_Msg, _ConnName, _ChannelNum, _Username, none) -> ok;
tap_out({#resource{name = QName, virtual_host = VHost},
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ba48867ad0..039568df8e 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -27,7 +27,7 @@
vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, listener/0,
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
- connection/0, protocol/0, user/0, internal_user/0,
+ connection/0, protocol/0, auth_user/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
@@ -131,11 +131,15 @@
-type(protocol() :: rabbit_framing:protocol()).
+-type(auth_user() ::
+ #auth_user{username :: username(),
+ tags :: [atom()],
+ impl :: any()}).
+
-type(user() ::
- #user{username :: username(),
- tags :: [atom()],
- auth_backend :: atom(),
- impl :: any()}).
+ #user{username :: username(),
+ tags :: [atom()],
+ authz_backends :: [{atom(), any()}]}).
-type(internal_user() ::
#internal_user{username :: username(),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 72bf7855c4..2ab6545911 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,7 +16,8 @@
-module(rabbit_upgrade).
--export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
+ nodes_running/1, secondary_upgrade/1]).
-include("rabbit.hrl").
@@ -122,6 +123,7 @@ remove_backup() ->
maybe_upgrade_mnesia() ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
+ ok = rabbit_mnesia_rename:maybe_finish(AllNodes),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 9f6dc21aa3..16f0b21b1b 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -50,6 +50,7 @@
-rabbit_upgrade({cluster_name, mnesia, [runtime_parameters]}).
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
+-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
%% -------------------------------------------------------------------
@@ -82,6 +83,7 @@
-spec(cluster_name/0 :: () -> 'ok').
-spec(down_slave_nodes/0 :: () -> 'ok').
-spec(queue_state/0 :: () -> 'ok').
+-spec(recoverable_slaves/0 :: () -> 'ok').
-endif.
@@ -418,6 +420,18 @@ queue_state(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, down_slave_nodes, policy, gm_pids, decorators, state]).
+recoverable_slaves() ->
+ ok = recoverable_slaves(rabbit_queue),
+ ok = recoverable_slaves(rabbit_durable_queue).
+
+recoverable_slaves(Table) ->
+ transform(
+ Table, fun (Q) -> Q end, %% Don't change shape of record
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
+ state]).
+
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1da3de2671..d4e0a04f39 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -28,16 +28,34 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0, init/5]).
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
%%----------------------------------------------------------------------------
+%% Messages, and their position in the queue, can be in memory or on
+%% disk, or both. Persistent messages will have both message and
+%% position pushed to disk as soon as they arrive; transient messages
+%% can be written to disk (and thus both types can be evicted from
+%% memory) under memory pressure. The question of whether a message is
+%% in RAM and whether it is persistent are orthogonal.
+%%
+%% Messages are persisted using the queue index and the message
+%% store. Normally the queue index holds the position of the message
+%% *within this queue* along with a couple of small bits of metadata,
+%% while the message store holds the message itself (including headers
+%% and other properties).
+%%
+%% However, as an optimisation, small messages can be embedded
+%% directly in the queue index and bypass the message store
+%% altogether.
+%%
%% Definitions:
-
+%%
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM
%%
-%% beta: this is a message where the message itself is only held on
-%% disk, but its position within the queue is held in RAM.
+%% beta: this is a message where the message itself is only held on
+%% disk (if persisted to the message store) but its position
+%% within the queue is held in RAM.
%%
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.
@@ -248,8 +266,9 @@
q3,
q4,
next_seq_id,
- ram_pending_ack,
- disk_pending_ack,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
index_state,
msg_store_clients,
durable,
@@ -274,7 +293,11 @@
unconfirmed,
confirmed,
ack_out_counter,
- ack_in_counter
+ ack_in_counter,
+ %% Unlike the other counters these two do not feed into
+ %% #rates{} and get reset
+ disk_read_count,
+ disk_write_count
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -285,8 +308,9 @@
msg,
is_persistent,
is_delivered,
- msg_on_disk,
+ msg_in_store,
index_on_disk,
+ persist_to,
msg_props
}).
@@ -300,11 +324,13 @@
%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
%% due to write indices for before we do any work at all.
-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
+-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
%%----------------------------------------------------------------------------
@@ -341,6 +367,7 @@
next_seq_id :: seq_id(),
ram_pending_ack :: gb_trees:tree(),
disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
@@ -367,7 +394,9 @@
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer() }).
+ ack_in_counter :: non_neg_integer(),
+ disk_read_count :: non_neg_integer(),
+ disk_write_count :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -426,16 +455,19 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(Queue, Recover, AsyncCallback) ->
- init(Queue, Recover, AsyncCallback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end).
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
- IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
@@ -446,13 +478,17 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
- AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
- {C, fun (MId) -> rabbit_msg_store:contains(MId, C) end};
+ {C, fun (MsgId) when is_binary(MsgId) ->
+ rabbit_msg_store:contains(MsgId, C);
+ (#basic_message{is_persistent = Persistent}) ->
+ Persistent
+ end};
false -> {undefined, fun(_MsgId) -> false end}
end,
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
@@ -461,7 +497,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
- ContainsCheckFun, MsgIdxOnDiskFun),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -514,51 +550,30 @@ delete_and_terminate(_Reason, State) ->
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-purge(State = #vqstate { q4 = Q4,
- index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- ram_bytes = RamBytes,
- persistent_count = PCount,
- persistent_bytes = PBytes }) ->
+purge(State = #vqstate { q4 = Q4,
+ len = Len }) ->
%% TODO: when there are no pending acks, which is a common case,
%% we could simply wipe the qi instead of issuing delivers and
%% acks for all the messages.
- Stats = {RamBytes, PCount, PBytes},
- {Stats1, IndexState1} =
- remove_queue_entries(Q4, Stats, IndexState, MSCState),
-
- {Stats2, State1 = #vqstate { q1 = Q1,
- index_state = IndexState2,
- msg_store_clients = MSCState1 }} =
-
- purge_betas_and_deltas(
- Stats1, State #vqstate { q4 = ?QUEUE:new(),
- index_state = IndexState1 }),
-
- {{RamBytes3, PCount3, PBytes3}, IndexState3} =
- remove_queue_entries(Q1, Stats2, IndexState2, MSCState1),
-
- {Len, a(State1 #vqstate { q1 = ?QUEUE:new(),
- index_state = IndexState3,
- len = 0,
- bytes = 0,
- ram_msg_count = 0,
- ram_bytes = RamBytes3,
- persistent_count = PCount3,
- persistent_bytes = PBytes3 })}.
+ State1 = remove_queue_entries(Q4, State),
+
+ State2 = #vqstate { q1 = Q1 } =
+ purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),
+
+ State3 = remove_queue_entries(Q1, State2),
+
+ {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, _Flow,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
@@ -567,42 +582,36 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
end,
InCount1 = InCount + 1,
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(
- 1, 0, MsgStatus1,
- inc_ram_msg_count(State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount1,
- persistent_count = PCount1,
- unconfirmed = UC1 })),
+ State3 = stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 }),
a(reduce_memory_use(maybe_update_rates(State3))).
publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, _Flow,
+ State = #vqstate { next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
- PCount1 = PCount + one_if(IsPersistent1),
UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = upd_bytes(0, 1, MsgStatus,
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- unconfirmed = UC1 }),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
-discard(_MsgId, _ChPid, State) -> State.
+discard(_MsgId, _ChPid, _Flow, State) -> State.
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
@@ -664,7 +673,7 @@ ack([], State) ->
ack([SeqId], State) ->
{#msg_status { msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
@@ -674,7 +683,7 @@ ack([SeqId], State) ->
true -> rabbit_queue_index:ack([SeqId], IndexState);
false -> IndexState
end,
- case MsgOnDisk of
+ case MsgInStore of
true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
false -> ok
end,
@@ -733,15 +742,18 @@ fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
{Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
[msg_iterator(State),
disk_ack_iterator(State),
- ram_ack_iterator(State)]),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA, disk_pending_ack = DPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA).
+depth(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -807,10 +819,11 @@ ram_duration(State) ->
ram_msg_count = RamMsgCount,
ram_msg_count_prev = RamMsgCountPrev,
ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
ram_ack_count_prev = RamAckCountPrev } =
update_rates(State),
- RamAckCount = gb_trees:size(RPA),
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
case lists:all(fun (X) -> X < 0.01 end,
@@ -846,8 +859,9 @@ msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA}) ->
- gb_trees:size(RPA);
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
info(messages_ram, State) ->
info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
@@ -863,6 +877,10 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
+info(disk_reads, #vqstate{disk_read_count = Count}) ->
+ Count;
+info(disk_writes, #vqstate{disk_write_count = Count}) ->
+ Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
@@ -933,14 +951,11 @@ d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
when Start + Count =< End ->
Delta.
-m(MsgStatus = #msg_status { msg = Msg,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+m(MsgStatus = #msg_status { is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk }) ->
true = (not IsPersistent) or IndexOnDisk,
- true = (not IndexOnDisk) or MsgOnDisk,
- true = (Msg =/= undefined) or MsgOnDisk,
-
+ true = msg_in_ram(MsgStatus) or MsgInStore,
MsgStatus.
one_if(true ) -> 1;
@@ -959,21 +974,39 @@ msg_status(IsPersistent, IsDelivered, SeqId,
msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = false,
+ msg_in_store = false,
index_on_disk = false,
+ persist_to = determine_persist_to(Msg, MsgProps),
msg_props = MsgProps}.
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ persist_to = queue_index,
+ msg_in_store = false};
+
beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ persist_to = msg_store,
+ msg_in_store = true}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
#msg_status{seq_id = SeqId,
- msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = true,
index_on_disk = true,
msg_props = MsgProps}.
-trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+trim_msg_status(MsgStatus) ->
+ case persist_to(MsgStatus) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
@@ -1035,26 +1068,36 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, IndexState) ->
- {Filtered, Delivers, Acks} =
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
- fun ({_MsgId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1} = Acc) ->
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1]};
- false -> case (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA)) of
- false -> {?QUEUE:in_r(m(beta_msg_status(M)),
- Filtered1),
- Delivers1, Acks1};
- true -> Acc
- end
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
end
- end, {?QUEUE:new(), [], []}, List),
- {Filtered, rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes,
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
@@ -1101,6 +1144,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
next_seq_id = NextSeqId,
ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
durable = IsDurable,
@@ -1125,7 +1169,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
unconfirmed = gb_sets:new(),
confirmed = gb_sets:new(),
ack_out_counter = 0,
- ack_in_counter = 0 },
+ ack_in_counter = 0,
+ disk_read_count = 0,
+ disk_write_count = 0 },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1141,11 +1187,9 @@ in_r(MsgStatus = #msg_status { msg = undefined },
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
- upd_ram_bytes(
- 1, MsgStatus,
- inc_ram_msg_count(
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus#msg_status {
- msg = Msg }, Q4a) }))
+ MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ stats(ready0, {MsgStatus, MsgStatus1},
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
@@ -1168,33 +1212,57 @@ read_msg(#msg_status{msg = undefined,
read_msg(#msg_status{msg = Msg}, State) ->
{Msg, State}.
-read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState}) ->
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
+ disk_read_count = Count}) ->
{{ok, Msg = #basic_message {}}, MSCState1} =
msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1}}.
-
-inc_ram_msg_count(State = #vqstate{ram_msg_count = RamMsgCount}) ->
- State#vqstate{ram_msg_count = RamMsgCount + 1}.
-
-upd_bytes(SignReady, SignUnacked,
- MsgStatus = #msg_status{msg = undefined}, State) ->
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State);
-upd_bytes(SignReady, SignUnacked, MsgStatus = #msg_status{msg = _}, State) ->
- upd_ram_bytes(SignReady + SignUnacked, MsgStatus,
- upd_bytes0(SignReady, SignUnacked, MsgStatus, State)).
-
-upd_bytes0(SignReady, SignUnacked, MsgStatus = #msg_status{is_persistent = IsP},
- State = #vqstate{bytes = Bytes,
- unacked_bytes = UBytes,
- persistent_bytes = PBytes}) ->
+ {Msg, State #vqstate {msg_store_clients = MSCState1,
+ disk_read_count = Count + 1}}.
+
+stats(Signs, Statuses, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), State).
+
+expand_signs(ready0) -> {0, 0, true};
+expand_signs({A, B}) -> {A, B, false}.
+
+expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
+expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
+
+%% In this function at least, we are religious: the variable name
+%% contains "Ready" or "Unacked" iff that is what it counts. If
+%% neither is present it counts both.
+stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
+ {InRamBefore, InRamAfter, MsgStatus},
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ persistent_bytes = PersistentBytes}) ->
S = msg_size(MsgStatus),
- SignTotal = SignReady + SignUnacked,
- State#vqstate{bytes = Bytes + SignReady * S,
- unacked_bytes = UBytes + SignUnacked * S,
- persistent_bytes = PBytes + one_if(IsP) * S * SignTotal}.
-
-upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
- State#vqstate{ram_bytes = RamBytes + Sign * msg_size(MsgStatus)}.
+ DeltaTotal = DeltaReady + DeltaUnacked,
+ DeltaRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ DeltaRamReady = case DeltaReady of
+ 1 -> one_if(InRamAfter);
+ -1 -> -one_if(InRamBefore);
+ 0 when ReadyMsgPaged -> DeltaRam;
+ 0 -> 0
+ end,
+ DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + DeltaReady,
+ ram_msg_count = RamReadyCount + DeltaRamReady,
+ persistent_count = PersistentCount + DeltaPersistent,
+ bytes = ReadyBytes + DeltaReady * S,
+ unacked_bytes = UnackedBytes + DeltaUnacked * S,
+ ram_bytes = RamBytes + DeltaRam * S,
+ persistent_bytes = PersistentBytes + DeltaPersistent * S}.
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
@@ -1203,17 +1271,13 @@ msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
- msg = Msg,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
- State = #vqstate {ram_msg_count = RamMsgCount,
- out_counter = OutCount,
+ State = #vqstate {out_counter = OutCount,
index_state = IndexState,
- msg_store_clients = MSCState,
- len = Len,
- persistent_count = PCount}) ->
+ msg_store_clients = MSCState}) ->
%% 1. Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
@@ -1224,10 +1288,11 @@ remove(AckRequired, MsgStatus = #msg_status {
ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- _ -> IndexState1
+ IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ {false, false, true} -> Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1238,166 +1303,215 @@ remove(AckRequired, MsgStatus = #msg_status {
{SeqId, StateN};
false -> {undefined, State}
end,
-
- PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
State2 = case AckRequired of
- false -> upd_bytes(-1, 0, MsgStatus, State1);
- true -> upd_bytes(-1, 1, MsgStatus, State1)
+ false -> stats({-1, 0}, {MsgStatus, none}, State1);
+ true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1)
end,
{AckTag, maybe_update_rates(
- State2 #vqstate {ram_msg_count = RamMsgCount1,
- out_counter = OutCount + 1,
- index_state = IndexState2,
- len = Len - 1,
- persistent_count = PCount1})}.
-
-purge_betas_and_deltas(Stats,
- State = #vqstate { q3 = Q3,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
+
+purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
- true -> {Stats, State};
- false -> {Stats1, IndexState1} = remove_queue_entries(
- Q3, Stats, IndexState, MSCState),
- purge_betas_and_deltas(Stats1,
- maybe_deltas_to_betas(
- State #vqstate {
- q3 = ?QUEUE:new(),
- index_state = IndexState1 }))
+ true -> State;
+ false -> State1 = remove_queue_entries(Q3, State),
+ purge_betas_and_deltas(maybe_deltas_to_betas(
+ State1#vqstate{q3 = ?QUEUE:new()}))
end.
-remove_queue_entries(Q, {RamBytes, PCount, PBytes},
- IndexState, MSCState) ->
- {MsgIdsByStore, RamBytes1, PBytes1, Delivers, Acks} =
+remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
+ msg_store_clients = MSCState}) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
- {orddict:new(), RamBytes, PBytes, [], []}, Q),
+ {orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- {{RamBytes1,
- PCount - case orddict:find(true, MsgIdsByStore) of
- error -> 0;
- {ok, Ids} -> length(Ids)
- end,
- PBytes1},
- rabbit_queue_index:ack(Acks,
- rabbit_queue_index:deliver(Delivers, IndexState))}.
+ IndexState1 = rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State1#vqstate{index_state = IndexState1}.
remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId, msg = Msg,
- is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
- index_on_disk = IndexOnDisk, is_persistent = IsPersistent,
- msg_props = #message_properties { size = Size } },
- {MsgIdsByStore, RamBytes, PBytes, Delivers, Acks}) ->
- {case MsgOnDisk of
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent} = MsgStatus,
+ {MsgIdsByStore, Delivers, Acks, State}) ->
+ {case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
- RamBytes - Size * one_if(Msg =/= undefined),
- PBytes - Size * one_if(IsPersistent),
cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- cons_if(IndexOnDisk, SeqId, Acks)}.
+ cons_if(IndexOnDisk, SeqId, Acks),
+ stats({-1, 0}, {MsgStatus, none}, State)}.
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_on_disk = true }, _MSCState) ->
- MsgStatus;
+ msg_in_store = true }, State) ->
+ {MsgStatus, State};
maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent }, MSCState)
+ is_persistent = IsPersistent },
+ State = #vqstate{ msg_store_clients = MSCState,
+ disk_write_count = Count})
when Force orelse IsPersistent ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- ok = msg_store_write(MSCState, IsPersistent, MsgId, Msg1),
- MsgStatus #msg_status { msg_on_disk = true };
-maybe_write_msg_to_disk(_Force, MsgStatus, _MSCState) ->
- MsgStatus.
+ case persist_to(MsgStatus) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ {MsgStatus#msg_status{msg_in_store = true},
+ State#vqstate{disk_write_count = Count + 1}};
+ queue_index -> {MsgStatus, State}
+ end;
+maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
- index_on_disk = true }, IndexState) ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
- {MsgStatus, IndexState};
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
msg_id = MsgId,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
- msg_props = MsgProps}, IndexState)
+ msg_props = MsgProps},
+ State = #vqstate{target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
when Force orelse IsPersistent ->
- true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
IndexState1 = rabbit_queue_index:publish(
- MsgId, SeqId, MsgProps, IsPersistent, IndexState),
- {MsgStatus #msg_status { index_on_disk = true },
- maybe_write_delivered(IsDelivered, SeqId, IndexState1)};
-maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
- {MsgStatus, IndexState}.
-
-maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
- MsgStatus1 = maybe_write_msg_to_disk(ForceMsg, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} =
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1 }}.
+ MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
+ IndexState),
+ IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState2,
+ disk_write_count = DiskWriteCount1}};
+
+maybe_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
+determine_persist_to(#basic_message{
+ content = #content{properties = Props,
+ properties_bin = PropsBin}},
+ #message_properties{size = BodySize}) ->
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ %% The >= is so that you can set the env to 0 and never persist
+ %% to the index.
+ %%
+ %% We want this to be fast, so we avoid size(term_to_binary())
+ %% here, or using the term size estimation from truncate.erl, both
+ %% of which are too slow. So instead, if the message body size
+ %% goes over the limit then we avoid any other checks.
+ %%
+ %% If it doesn't we need to decide if the properties will push
+ %% it past the limit. If we have the encoded properties (usual
+ %% case) we can just check their size. If we don't (message came
+ %% via the direct client), we make a guess based on the number of
+ %% headers.
+ case BodySize >= IndexMaxSize of
+ true -> msg_store;
+ false -> Est = case is_binary(PropsBin) of
+ true -> BodySize + size(PropsBin);
+ false -> #'P_basic'{headers = Hs} = Props,
+ case Hs of
+ undefined -> 0;
+ _ -> length(Hs)
+ end * ?HEADER_GUESS_SIZE + BodySize
+ end,
+ case Est >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end
+ end.
+
+persist_to(#msg_status{persist_to = To}) -> To.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------
-record_pending_ack(#msg_status { seq_id = SeqId, msg = Msg } = MsgStatus,
+record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
ack_in_counter = AckInCount}) ->
- {RPA1, DPA1} =
- case Msg of
- undefined -> {RPA, gb_trees:insert(SeqId, MsgStatus, DPA)};
- _ -> {gb_trees:insert(SeqId, MsgStatus, RPA), DPA}
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
+ {false, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
end,
State #vqstate { ram_pending_ack = RPA1,
disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
ack_in_counter = AckInCount + 1}.
lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> V;
- none -> gb_trees:get(SeqId, DPA)
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
end.
-%% First parameter = UpdatePersistentCount
+%% First parameter = UpdateStats
remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1 = #vqstate { persistent_count = PCount }} =
- remove_pending_ack(false, SeqId, State),
- PCount1 = PCount - one_if(MsgStatus#msg_status.is_persistent),
- {MsgStatus, upd_bytes(0, -1, MsgStatus,
- State1 # vqstate{ persistent_count = PCount1 })};
-remove_pending_ack(false, SeqId, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+ {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
case gb_trees:lookup(SeqId, RPA) of
{value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
{V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> DPA1 = gb_trees:delete(SeqId, DPA),
- {gb_trees:get(SeqId, DPA),
- State #vqstate { disk_pending_ack = DPA1 }}
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {gb_trees:get(SeqId, QPA),
+ State#vqstate{qi_pending_ack = QPA1}}
+ end
end.
purge_pending_ack(KeepPersistent,
State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(F, accumulate_ack_init(), RPA), DPA),
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty() },
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1418,11 +1532,11 @@ accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
+ msg_in_store = MsgInStore,
index_on_disk = IndexOnDisk },
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgOnDisk of
+ case MsgInStore of
true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
false -> MsgIdsByStore
end,
@@ -1469,29 +1583,25 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
gb_sets:union(MIOD, Confirmed) })
end).
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
%%----------------------------------------------------------------------------
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
{Msg, State1} = read_msg(MsgStatus, State),
- {MsgStatus#msg_status { msg = Msg },
- upd_ram_bytes(1, MsgStatus, inc_ram_msg_count(State1))}; %% [1]
+ MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
publish_alpha(MsgStatus, State) ->
- {MsgStatus, inc_ram_msg_count(State)}.
-%% [1] We increase the ram_bytes here because we paged the message in
-%% to requeue it, not purely because we requeued it. Hence in the
-%% second head it's already accounted for as already in memory. OTOH
-%% ram_msg_count does not include unacked messages, so it needs
-%% incrementing in both heads.
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
- true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
- _ -> {MsgStatus2, State1}
- end.
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1513,7 +1623,7 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, upd_bytes(1, -1, MsgStatus, State2))
+ Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -1527,13 +1637,8 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
- State3 =
- case msg_in_ram(MsgStatus) of
- false -> State2;
- true -> upd_ram_bytes(-1, MsgStatus, State2)
- end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State3)}
+ stats({1, -1}, {MsgStatus, none}, State2)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
@@ -1563,6 +1668,9 @@ ram_ack_iterator(State) ->
disk_ack_iterator(State) ->
{ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
@@ -1592,7 +1700,8 @@ next({delta, Delta, [], State}, IndexState) ->
next({delta, Delta, State}, IndexState);
next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
- gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
+ gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack) orelse
+ gb_trees:is_defined(SeqId, State#vqstate.qi_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
{value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
@@ -1689,12 +1798,12 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
- DPA1 = gb_trees:insert(SeqId, m(trim_msg_status(MsgStatus1)), DPA),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
- upd_ram_bytes(
- -1, MsgStatus1,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 }))
+ stats({0, 0}, {MsgStatus, MsgStatus2},
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
end.
permitted_beta_count(#vqstate { len = 0 }) ->
@@ -1755,8 +1864,12 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
ram_pending_ack = RPA,
disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
+ disk_read_count = DiskReadCount,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1766,9 +1879,13 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, IndexState2} = betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2 },
+ {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ betas_from_index_entries(List, TransientThreshold,
+ RPA, DPA, QPA, IndexState1),
+ State1 = State #vqstate { index_state = IndexState2,
+ ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc},
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
@@ -1826,26 +1943,21 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true },
- State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ {MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = Consumer(
- MsgStatus2, Qa,
- upd_ram_bytes(
- -1, MsgStatus2,
- State1 #vqstate {
- ram_msg_count = RamMsgCount - 1})),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State2)
+ Qa, State3)
end
end.
-push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState }) ->
- PushState = {Quota, Delta, IndexState},
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
{Q3a, PushState1} = push_betas_to_deltas(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
@@ -1854,11 +1966,10 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
Q2, PushState1),
- {_, Delta1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1 }.
+ {_, Delta1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a }.
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
@@ -1874,11 +1985,9 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end
end.
-push_betas_to_deltas1(_Generator, _Limit, Q,
- {0, _Delta, _IndexState} = PushState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
{Q, PushState};
-push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, IndexState} = PushState) ->
+push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1886,11 +1995,12 @@ push_betas_to_deltas1(Generator, Limit, Q,
when SeqId < Limit ->
{Q, PushState};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(true, MsgStatus, IndexState),
+ {#msg_status { index_on_disk = true }, State1} =
+ maybe_write_index_to_disk(true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, IndexState1})
+ {Quota - 1, Delta1, State2})
end.
%%----------------------------------------------------------------------------
diff --git a/src/truncate.erl b/src/truncate.erl
index 820af1bf86..1b4d957d0b 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -45,7 +45,7 @@ report(List, Params) when is_list(List) -> [case Item of
report(Other, Params) -> term(Other, Params).
term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) ->
- case term_limit(Thing, Max) of
+ case exceeds_size(Thing, Max) of
true -> term(Thing, true, #params{content = Content,
struct = Struct,
content_dec = ContentDec,
@@ -93,7 +93,7 @@ shrink_list([H|T], #params{content = Content,
%% sizes. This is all going to be rather approximate though, these
%% sizes are probably not very "fair" but we are just trying to see if
%% we reach a fairly arbitrary limit anyway though.
-term_limit(Thing, Max) ->
+exceeds_size(Thing, Max) ->
case term_size(Thing, Max, erlang:system_info(wordsize)) of
limit_exceeded -> true;
_ -> false
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 608cea9166..71359aa5ed 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -18,13 +18,34 @@
%% Generic worker pool manager.
%%
-%% Supports nested submission of jobs (nested jobs always run
-%% immediately in current worker process).
+%% Submitted jobs are functions. They can be executed asynchronously
+%% (using worker_pool:submit/1, worker_pool:submit/2) or synchronously
+%% (using worker_pool:submit_async/1).
%%
-%% Possible future enhancements:
+%% We typically use the worker pool if we want to limit the maximum
+%% parallelism of some job. We are not trying to dodge the cost of
+%% creating Erlang processes.
%%
-%% 1. Allow priorities (basically, change the pending queue to a
-%% priority_queue).
+%% Supports nested submission of jobs and two execution modes:
+%% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in
+%% a one-off process. Those executed in 'reuse' mode are invoked in a
+%% worker process out of the pool. Nested jobs are always executed
+%% immediately in current worker process.
+%%
+%% 'single' mode is offered to work around a bug in Mnesia: after
+%% network partitions reply messages for prior failed requests can be
+%% sent to Mnesia clients - a reused worker pool process can crash on
+%% receiving one.
+%%
+%% Caller submissions are enqueued internally. When the next worker
+%% process is available, it communicates it to the pool and is
+%% assigned a job to execute. If job execution fails with an error, no
+%% response is returned to the caller.
+%%
+%% Worker processes prioritise certain command-and-control messages
+%% from the pool.
+%%
+%% Future improvement points: job prioritisation.
-behaviour(gen_server2).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 819a6ae8ce..c2d058923d 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -16,6 +16,11 @@
-module(worker_pool_worker).
+%% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1,
+%% worker_pool:submit/2 or worker_pool:submit_async/1.
+%%
+%% See worker_pool for an overview.
+
-behaviour(gen_server2).
-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).