summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl6
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl169
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gm.erl4
-rw-r--r--src/rabbit.erl29
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl454
-rw-r--r--src/rabbit_amqqueue_sup.erl14
-rw-r--r--src/rabbit_auth_backend_internal.erl30
-rw-r--r--src/rabbit_backing_queue_qc.erl215
-rw-r--r--src/rabbit_basic.erl74
-rw-r--r--src/rabbit_binary_generator.erl19
-rw-r--r--src/rabbit_binding.erl31
-rw-r--r--src/rabbit_channel.erl380
-rw-r--r--src/rabbit_client_sup.erl3
-rw-r--r--src/rabbit_command_assembler.erl4
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_error_logger_file_h.erl38
-rw-r--r--src/rabbit_file.erl282
-rw-r--r--src/rabbit_guid.erl4
-rw-r--r--src/rabbit_limiter.erl1
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_misc.erl20
-rw-r--r--src/rabbit_mirror_queue_slave.erl32
-rw-r--r--src/rabbit_misc.erl201
-rw-r--r--src/rabbit_mnesia.erl31
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_networking.erl28
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_prelaunch.erl41
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_reader.erl41
-rw-r--r--src/rabbit_restartable_sup.erl10
-rw-r--r--src/rabbit_router.erl16
-rw-r--r--src/rabbit_sasl_report_file_h.erl24
-rw-r--r--src/rabbit_sup.erl15
-rw-r--r--src/rabbit_tests.erl104
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_upgrade.erl4
-rw-r--r--src/rabbit_variable_queue.erl388
-rw-r--r--src/rabbit_version.erl4
-rw-r--r--src/rabbit_vhost.erl4
-rw-r--r--src/rabbit_writer.erl3
-rw-r--r--src/tcp_acceptor_sup.erl8
-rw-r--r--src/tcp_listener.erl8
-rw-r--r--src/tcp_listener_sup.erl15
-rw-r--r--src/test_sup.erl12
-rw-r--r--src/vm_memory_monitor.erl4
-rw-r--r--src/worker_pool.erl1
-rw-r--r--src/worker_pool_sup.erl4
53 files changed, 1701 insertions, 1153 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 17046201ad..edb4eba4ae 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -28,13 +28,13 @@
-ifdef(use_specs).
-spec(start_link/1 ::
- (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
--spec(invoke_no_result/2 ::
- (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+ (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
( pid(), fun ((pid()) -> A)) -> A;
([pid()], fun ((pid()) -> A)) -> {[{pid(), A}],
[{pid(), term()}]}).
+-spec(invoke_no_result/2 ::
+ (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-endif.
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index fc693c7d3d..4c131a6c59 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -28,7 +28,7 @@
-ifdef(use_specs).
--spec(start_link/1 :: (integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (integer()) -> rabbit_types:ok_pid_or_error()).
-spec(count/1 :: ([node()]) -> integer()).
-endif.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 3c2111dc78..6c3f1b5f50 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,37 +120,39 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and transfer. obtain/0 blocks until
-%% a file descriptor is available, at which point the requesting
-%% process is considered to 'own' one more descriptor. transfer/1
-%% transfers ownership of a file descriptor between processes. It is
-%% non-blocking. Obtain is used to obtain permission to accept file
-%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1
-%% macro. File handles can use the entire limit, but will be evicted
-%% by obtain calls up to the point at which no more obtain calls can
-%% be satisfied by the obtains limit. Thus there will always be some
-%% capacity available for file handles. Processes that use obtain are
-%% never asked to return them, and they are not managed in any way by
-%% the server. It is simply a mechanism to ensure that processes that
-%% need file descriptors such as sockets can do so in such a way that
-%% the overall number of open file descriptors is managed.
+%% The server also supports obtain, release and transfer. obtain/0
+%% blocks until a file descriptor is available, at which point the
+%% requesting process is considered to 'own' one more
+%% descriptor. release/0 is the inverse operation and releases a
+%% previously obtained descriptor. transfer/1 transfers ownership of a
+%% file descriptor between processes. It is non-blocking. Obtain is
+%% used to obtain permission to accept file descriptors. Obtain has a
+%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
+%% the entire limit, but will be evicted by obtain calls up to the
+%% point at which no more obtain calls can be satisfied by the obtains
+%% limit. Thus there will always be some capacity available for file
+%% handles. Processes that use obtain are never asked to return them,
+%% and they are not managed in any way by the server. It is simply a
+%% mechanism to ensure that processes that need file descriptors such
+%% as sockets can do so in such a way that the overall number of open
+%% file descriptors is managed.
%%
%% The callers of register_callback/3, obtain/0, and the argument of
%% transfer/1 are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([register_callback/3]).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
- info/1]).
+-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+ info/0, info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -159,7 +161,8 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
--define(CLIENT_ETS_TABLE, ?MODULE).
+-define(CLIENT_ETS_TABLE, file_handle_cache_client).
+-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
%%----------------------------------------------------------------------------
@@ -228,7 +231,7 @@
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
- (string(), [any()],
+ (file:filename(), [any()],
[{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
@@ -243,17 +246,18 @@
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
val_or_error(non_neg_integer())).
--spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(release/0 :: () -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
--spec(info_keys/0 :: () -> [atom()]).
--spec(info/0 :: () -> [{atom(), any()}]).
--spec(info/1 :: ([atom()]) -> [{atom(), any()}]).
--spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(info/0 :: () -> rabbit_types:infos()).
+-spec(info/1 :: ([atom()]) -> rabbit_types:infos()).
+-spec(ulimit/0 :: () -> 'unknown' | non_neg_integer()).
-endif.
@@ -265,11 +269,11 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
- gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
+ gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
open(Path, Mode, Options) ->
Path1 = filename:absname(Path),
@@ -317,7 +321,7 @@ read(Ref, Count) ->
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
+ case prim_file:read(Hdl, Count) of
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
{Obj,
[Handle #handle { offset = Offset1 }]};
@@ -337,7 +341,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
+ {prim_file:write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -364,7 +368,7 @@ sync(Ref) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case file:sync(Hdl) of
+ case prim_file:sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -381,7 +385,7 @@ truncate(Ref) ->
with_flushed_handles(
[Ref],
fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
@@ -408,7 +412,7 @@ copy(Src, Dest, Count) ->
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
) ->
- case file:copy(SHdl, DHdl, Count) of
+ case prim_file:copy(SHdl, DHdl, Count) of
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
@@ -428,7 +432,7 @@ delete(Ref) ->
Handle = #handle { path = Path } ->
case hard_close(Handle #handle { is_dirty = false,
write_buffer = [] }) of
- ok -> file:delete(Path);
+ ok -> prim_file:delete(Path);
{Error, Handle1} -> put_handle(Ref, Handle1),
Error
end
@@ -443,7 +447,7 @@ clear(Ref) ->
case maybe_seek(bof, Handle #handle { write_buffer = [],
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
@@ -470,21 +474,28 @@ set_maximum_since_use(MaximumAge) ->
end.
obtain() ->
- gen_server:call(?SERVER, {obtain, self()}, infinity).
+ %% If the FHC isn't running, obtains succeed immediately.
+ case whereis(?SERVER) of
+ undefined -> ok;
+ _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ end.
+
+release() ->
+ gen_server2:cast(?SERVER, {release, self()}).
transfer(Pid) ->
- gen_server:cast(?SERVER, {transfer, self(), Pid}).
+ gen_server2:cast(?SERVER, {transfer, self(), Pid}).
set_limit(Limit) ->
- gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+ gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
get_limit() ->
- gen_server:call(?SERVER, get_limit, infinity).
+ gen_server2:call(?SERVER, get_limit, infinity).
info_keys() -> ?INFO_KEYS.
info() -> info(?INFO_KEYS).
-info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -538,8 +549,8 @@ get_or_reopen(RefNewOrReopens) ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
Oldest = oldest(get_age_tree(), fun () -> now() end),
- case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
- Oldest}, infinity) of
+ case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
ok ->
case reopen(ClosedHdls) of
{ok, RefHdls} -> sort_handles(RefNewOrReopens,
@@ -566,10 +577,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ case prim_file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
@@ -582,7 +593,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
Error ->
%% NB: none of the handles in ToOpen are in the age tree
Oldest = oldest(Tree, fun () -> undefined end),
- [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
put_age_tree(Tree),
Error
end.
@@ -631,7 +642,7 @@ age_tree_delete(Then) ->
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
Oldest = oldest(Tree1, fun () -> undefined end),
- gen_server:cast(?SERVER, {close, self(), Oldest}),
+ gen_server2:cast(?SERVER, {close, self(), Oldest}),
Tree1
end).
@@ -641,7 +652,7 @@ age_tree_change() ->
case gb_trees:is_empty(Tree) of
true -> Tree;
false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
- gen_server:cast(?SERVER, {update, self(), Oldest})
+ gen_server2:cast(?SERVER, {update, self(), Oldest})
end,
Tree
end).
@@ -693,10 +704,10 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> file:sync(Hdl);
+ true -> prim_file:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl),
+ ok = prim_file:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
@@ -731,7 +742,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
at_eof = AtEoF }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
case (case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
+ true -> prim_file:position(Hdl, NewOffset);
false -> {ok, Offset}
end) of
{ok, Offset1} = Result ->
@@ -768,7 +779,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -784,7 +795,7 @@ i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_server2 callbacks
%%----------------------------------------------------------------------------
init([]) ->
@@ -794,7 +805,6 @@ init([]) ->
Watermark;
_ ->
case ulimit() of
- infinity -> infinity;
unknown -> ?FILE_HANDLES_LIMIT_OTHER;
Lim -> lists:max([2, Lim - ?RESERVED_FOR_OTHERS])
end
@@ -803,7 +813,8 @@ init([]) ->
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
- {ok, #fhc_state { elders = dict:new(),
+ Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
+ {ok, #fhc_state { elders = Elders,
limit = Limit,
open_count = 0,
open_pending = pending_new(),
@@ -813,34 +824,39 @@ init([]) ->
clients = Clients,
timer_ref = undefined }}.
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ {release, _} -> 5;
+ _ -> 0
+ end.
+
handle_call({open, Pid, Requested, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
Item = #pending { kind = open,
pid = Pid,
requested = Requested,
from = From },
ok = track_client(Pid, Clients),
- State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
true -> case ets:lookup(Clients, Pid) of
[#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
- reduce(State1 #fhc_state {
+ reduce(State #fhc_state {
open_pending = pending_in(Item, Pending) })};
[#cstate { opened = Opened }] ->
true = ets:update_element(
Clients, Pid,
{#cstate.pending_closes, Opened}),
- {reply, close, State1}
+ {reply, close, State}
end;
- false -> {noreply, run_pending_item(Item, State1)}
+ false -> {noreply, run_pending_item(Item, State)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
@@ -888,21 +904,24 @@ handle_cast({register_callback, Pid, MFA},
handle_cast({update, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, State #fhc_state { elders = Elders1 }};
+ {noreply, State};
+
+handle_cast({release, Pid}, State) ->
+ {noreply, adjust_alarm(State, process_pending(
+ update_counts(obtain, Pid, -1, State)))};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
- Elders1 = case EldestUnusedSince of
- undefined -> dict:erase(Pid, Elders);
- _ -> dict:store(Pid, EldestUnusedSince, Elders)
- end,
+ true = case EldestUnusedSince of
+ undefined -> ets:delete(Elders, Pid);
+ _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
+ end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, adjust_alarm(State, process_pending(
- update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 })))};
+ update_counts(open, Pid, -1, State)))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -923,6 +942,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ true = ets:delete(Elders, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, adjust_alarm(
State,
@@ -931,11 +951,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
open_count = OpenCount - Opened,
open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) }))}.
+ obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
-terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+terminate(_Reason, State = #fhc_state { clients = Clients,
+ elders = Elders }) ->
ets:delete(Clients),
+ ets:delete(Elders),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -1047,7 +1068,7 @@ run_pending_item(#pending { kind = Kind,
requested = Requested,
from = From },
State = #fhc_state { clients = Clients }) ->
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
update_counts(Kind, Pid, Requested, State).
@@ -1091,7 +1112,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
- dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
diff --git a/src/gatherer.erl b/src/gatherer.erl
index aa43e9a980..fe976b50a2 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -27,7 +27,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(stop/1 :: (pid()) -> 'ok').
-spec(fork/1 :: (pid()) -> 'ok').
-spec(finish/1 :: (pid()) -> 'ok').
diff --git a/src/gm.erl b/src/gm.erl
index 8b4d2776c8..8c838a7056 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -422,9 +422,9 @@
-type(group_name() :: any()).
--spec(create_tables/0 :: () -> 'ok').
+-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
-spec(start_link/3 :: (group_name(), atom(), any()) ->
- {'ok', pid()} | {'error', any()}).
+ rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 8feeb3649d..e98ca9be33 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -206,6 +206,14 @@
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start/2 :: ('normal',[]) ->
+ {'error',
+ {'erlang_version_too_old',
+ {'found',[any()]},
+ {'required',[any(),...]}}} |
+ {'ok',pid()}).
+-spec(stop/1 :: (_) -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -224,12 +232,14 @@ start() ->
end.
stop() ->
+ rabbit_log:info("Stopping Rabbit~n"),
ok = rabbit_misc:stop_applications(application_load_order()).
stop_and_halt() ->
try
stop()
after
+ rabbit_misc:local_info_msg("Halting Erlang VM~n", []),
init:stop()
end,
ok.
@@ -262,6 +272,7 @@ environment() ->
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
+ rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]),
log_rotation_result(rotate_logs(log_location(kernel),
Suffix,
rabbit_error_logger_file_h),
@@ -459,20 +470,20 @@ insert_default_data() ->
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
- ok = ensure_working_log_handler(error_logger_file_h,
+ ok = ensure_working_log_handler(error_logger_tty_h,
rabbit_error_logger_file_h,
error_logger_tty_h,
log_location(kernel),
Handlers),
- ok = ensure_working_log_handler(sasl_report_file_h,
+ ok = ensure_working_log_handler(sasl_report_tty_h,
rabbit_sasl_report_file_h,
sasl_report_tty_h,
log_location(sasl),
Handlers),
ok.
-ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
+ensure_working_log_handler(OldHandler, NewHandler, TTYHandler,
LogLocation, Handlers) ->
case LogLocation of
undefined -> ok;
@@ -482,10 +493,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
- OldFHandler, NewFHandler) of
+ OldHandler, NewHandler) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_log_to_file,
@@ -495,10 +506,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end.
log_location(Type) ->
- case application:get_env(Type, case Type of
- kernel -> error_logger;
- sasl -> sasl_error_logger
- end) of
+ case application:get_env(rabbit, case Type of
+ kernel -> error_logger;
+ sasl -> sasl_error_logger
+ end) of
{ok, {file, File}} -> File;
{ok, false} -> undefined;
{ok, tty} -> tty;
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index c0ae18c0a1..ca28d68637 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -32,6 +32,9 @@
-spec(check_user_pass_login/2 ::
(rabbit_types:username(), rabbit_types:password())
-> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+-spec(check_user_login/2 ::
+ (rabbit_types:username(), [{atom(), any()}])
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5bd42d9ab2..b3e92b6918 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -49,7 +49,7 @@
-type(name() :: rabbit_types:r('queue')).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
--type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)).
+-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
@@ -64,6 +64,9 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> {'new' | 'existing', rabbit_types:amqqueue()} |
rabbit_types:channel_exit()).
+-spec(internal_declare/2 ::
+ (rabbit_types:amqqueue(), boolean())
+ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
rabbit_types:error('not_found')).
@@ -132,9 +135,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 ::
- (rabbit_types:amqqueue(), boolean())
- -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -147,6 +147,7 @@
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
+-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e5038efe39..fe1ddba02b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -29,12 +29,12 @@
-export([start_link/1, info_keys/0]).
+-export([init_with_backing_queue_state/7]).
+
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
--export([init_with_backing_queue_state/7]).
-
%% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -42,7 +42,6 @@
backing_queue,
backing_queue_state,
active_consumers,
- blocked_consumers,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -56,14 +55,30 @@
-record(consumer, {tag, ack_required}).
%% These are held in our process dictionary
--record(cr, {consumer_count,
- ch_pid,
- limiter,
+-record(cr, {ch_pid,
monitor_ref,
acktags,
+ consumer_count,
+ blocked_consumers,
+ limiter,
is_limit_active,
unsent_message_count}).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 ::
+ (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
+-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
+-spec(init_with_backing_queue_state/7 ::
+ (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
+ [rabbit_types:delivery()], dict()) -> #q{}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
-define(STATISTICS_KEYS,
[pid,
exclusive_consumer_pid,
@@ -109,7 +124,6 @@ init(Q) ->
backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
@@ -135,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = RateTRef,
@@ -321,10 +334,11 @@ ch_record(ChPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumer_count = 0,
- ch_pid = ChPid,
+ C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = sets:new(),
+ consumer_count = 0,
+ blocked_consumers = queue:new(),
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -333,18 +347,18 @@ ch_record(ChPid) ->
C = #cr{} -> C
end.
-store_ch_record(C = #cr{ch_pid = ChPid}) ->
- put({ch, ChPid}, C).
-
-maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
- acktags = ChAckTags,
- unsent_message_count = UnsentMessageCount}) ->
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
- {0, 0, 0} -> ok = erase_ch_record(C),
- false;
- _ -> store_ch_record(C),
- true
- end.
+ {0, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
erase_ch_record(#cr{ch_pid = ChPid,
limiter = Limiter,
@@ -354,8 +368,21 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
@@ -367,67 +394,56 @@ ch_record_state_transition(OldCR, NewCR) ->
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- case queue:out(ActiveConsumers) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
- ActiveConsumersTail} ->
- C = #cr{limiter = Limiter,
- unsent_message_count = Count,
- acktags = ChAckTags} = ch_record(ChPid),
- IsMsgReady = PredFun(FunAcc, State),
- case (IsMsgReady andalso
- rabbit_limiter:can_send(Limiter, self(), AckRequired)) of
- true ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 =
- case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- NewC = C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1},
- true = maybe_store_ch_record(NewC),
- {NewActiveConsumers, NewBlockedConsumers} =
- case ch_record_state_transition(C, NewC) of
- ok -> {queue:in(QEntry, ActiveConsumersTail),
- BlockedConsumers};
- block -> {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
- end,
- State2 = State1#q{
- active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
- %% if IsMsgReady then we've hit the limiter
- false when IsMsgReady ->
- true = maybe_store_ch_record(C#cr{is_limit_active = true}),
- {NewActiveConsumers, NewBlockedConsumers} =
- move_consumers(ChPid,
- ActiveConsumers,
- BlockedConsumers),
- deliver_msgs_to_consumers(
- Funs, FunAcc,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers});
- false ->
- %% no message was ready, so we don't need to block anyone
- {FunAcc, State}
- end;
- {empty, _} ->
- {FunAcc, State}
+ State = #q{active_consumers = ActiveConsumers}) ->
+ case PredFun(FunAcc, State) of
+ false -> {FunAcc, State};
+ true -> case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {FunAcc, State};
+ {{value, QEntry}, Tail} ->
+ {FunAcc1, State1} =
+ deliver_msg_to_consumer(
+ DeliverFun, QEntry,
+ FunAcc, State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(Funs, FunAcc1, State1)
+ end
end.
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+ C = ch_record(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ {FunAcc, State};
+ false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
+ Consumer#consumer.ack_required) of
+ false -> block_consumer(C#cr{is_limit_active = true}, E),
+ {FunAcc, State};
+ true -> AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C, FunAcc,
+ State#q{active_consumers = AC1})
+ end
+ end.
+
+deliver_msg_to_consumer(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ {FunAcc1, State1}.
+
deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
@@ -438,33 +454,20 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
- {CMs, MTC1} = lists:foldl(
- fun(MsgId, {CMs, MTC0}) ->
- case dict:find(MsgId, MTC0) of
- {ok, {ChPid, MsgSeqNo}} ->
- {gb_trees_cons(ChPid, MsgSeqNo, CMs),
- dict:erase(MsgId, MTC0)};
- _ ->
- {CMs, MTC0}
- end
- end, {gb_trees:empty(), MTC}, MsgIds),
- gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
+ {CMs, MTC1} =
+ lists:foldl(
+ fun(MsgId, {CMs, MTC0}) ->
+ case dict:find(MsgId, MTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs),
+ dict:erase(MsgId, MTC0)};
+ _ ->
+ {CMs, MTC0}
+ end
+ end, {gb_trees:empty(), MTC}, MsgIds),
+ rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
State#q{msg_id_to_channel = MTC1}.
-gb_trees_foreach(_, none) ->
- ok;
-gb_trees_foreach(Fun, {Key, Val, It}) ->
- Fun(Key, Val),
- gb_trees_foreach(Fun, gb_trees:next(It));
-gb_trees_foreach(Fun, Tree) ->
- gb_trees_foreach(Fun, gb_trees:next(gb_trees:iterator(Tree))).
-
-gb_trees_cons(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
- none -> gb_trees:insert(Key, [Value], Tree)
- end.
-
should_confirm_message(#delivery{msg_seq_no = undefined}, _State) ->
never;
should_confirm_message(#delivery{sender = ChPid,
@@ -545,11 +548,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
case Delivered of
true -> State2;
- false -> BQS1 =
- BQ:publish(Message,
- (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
- ChPid, BQS),
+ false -> Props = (message_properties(State)) #message_properties{
+ needs_confirming = needs_confirming(Confirm)},
+ BQS1 = BQ:publish(Message, Props, ChPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -566,44 +567,34 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
-add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-
remove_consumer(ChPid, ConsumerTag, Queue) ->
- queue:filter(fun ({CP, #consumer{tag = CT}}) ->
- (CP /= ChPid) or (CT /= ConsumerTag)
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
+ (CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
- {Kept, Removed} = split_by_channel(ChPid, Queue),
- [emit_consumer_deleted(Ch, CTag) ||
- {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
- Kept.
-
-move_consumers(ChPid, From, To) ->
- {Kept, Removed} = split_by_channel(ChPid, From),
- {Kept, queue:join(To, Removed)}.
-
-split_by_channel(ChPid, Queue) ->
- {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue)),
- {queue:from_list(Kept), queue:from_list(Removed)}.
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found ->
State;
C ->
- NewC = Update(C),
- maybe_store_ch_record(NewC),
- case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_message_queue(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ C1 = Update(C),
+ case ch_record_state_transition(C, C1) of
+ ok -> update_ch_record(C1),
+ State;
+ unblock -> #cr{blocked_consumers = Consumers} = C1,
+ update_ch_record(
+ C1#cr{blocked_consumers = queue:new()}),
+ AC1 = queue:join(State#q.active_consumers,
+ Consumers),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -615,7 +606,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = Blocked} ->
+ _ = remove_consumers(ChPid, Blocked), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -623,9 +617,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)},
+ ChPid, State#q.active_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -633,11 +625,6 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
end
end.
-cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
- none;
-cancel_holder(_ChPid, _ConsumerTag, Holder) ->
- Holder.
-
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
@@ -648,8 +635,15 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
- queue:is_empty(State#q.blocked_consumers).
+consumer_count() -> consumer_count(fun (_) -> false end).
+
+active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
+
+consumer_count(Exclude) ->
+ lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
+ not Exclude(C)]).
+
+is_unused(_State) -> consumer_count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -663,8 +657,15 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-subtract_acks(A, B) when is_list(B) ->
- lists:foldl(fun sets:del_element/2, A, B).
+subtract_acks(ChPid, AckTags, State, Fun) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ State;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(C#cr{acktags = lists:foldl(fun sets:del_element/2,
+ ChAckTags, AckTags)}),
+ Fun(State)
+ end.
discard_delivery(#delivery{sender = ChPid,
message = Message},
@@ -768,8 +769,8 @@ i(messages_unacknowledged, _) ->
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
-i(consumers, State) ->
- queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
+i(consumers, _) ->
+ consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -785,13 +786,15 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+consumers(#q{active_consumers = ActiveConsumers}) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(ActiveConsumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
+ [{ChPid, CTag, AckRequired} | Acc1]
+ end, Acc, Consumers).
emit_stats(State) ->
emit_stats(State, []).
@@ -931,10 +934,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State3 =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- true = maybe_store_ch_record(
- C#cr{acktags =
- sets:add_element(AckTag,
- ChAckTags)}),
+ ChAckTags1 = sets:add_element(AckTag, ChAckTags),
+ update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
false -> State2
end,
@@ -950,33 +951,24 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- true = maybe_store_ch_record(
- C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter}),
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
+ E = {ChPid, Consumer},
State2 =
- case is_ch_blocked(C) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.active_consumers)})
+ case is_ch_blocked(C1) of
+ true -> block_consumer(C1, E),
+ State1;
+ false -> update_ch_record(C1),
+ AC1 = queue:in(E, State1#q.active_consumers),
+ run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
@@ -985,42 +977,32 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State = #q{exclusive_consumer = Holder}) ->
+ ok = maybe_send_reply(ChPid, OkMsg),
case lookup_ch(ChPid) of
not_found ->
- ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- maybe_store_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C = #cr{blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag),
- ok = maybe_send_reply(ChPid, OkMsg),
- NewState =
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- active_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
+ Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ State1 = State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, ConsumerTag} -> none;
+ _ -> Holder
+ end,
+ active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.blocked_consumers)},
- case should_auto_delete(NewState) of
- false -> reply(ok, ensure_expiry_timer(NewState));
- true -> {stop, normal, ok, NewState}
+ State#q.active_consumers)},
+ case should_auto_delete(State1) of
+ false -> reply(ok, ensure_expiry_timer(State1));
+ true -> {stop, normal, ok, State1}
end
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
- active_consumers = ActiveConsumers} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
+ reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1042,14 +1024,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end.
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end)).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1058,33 +1035,26 @@ handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
-handle_cast({ack, AckTags, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- maybe_store_ch_record(C#cr{acktags = subtract_acks(
- ChAckTags, AckTags)}),
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- noreply(State#q{backing_queue_state = BQS1})
- end;
-
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(case Requeue of
- true -> requeue_and_run(AckTags, State);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1}
- end)
- end;
+handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end));
+
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case Requeue of
+ true -> requeue_and_run(AckTags, State1);
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end
+ end));
handle_cast(delete_immediately, State) ->
{stop, normal, State};
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 2c28adcefd..7b3ebcf266 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -26,6 +26,20 @@
-define(SERVER, ?MODULE).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_child/2 ::
+ (node(), [any()]) -> rabbit_types:ok(pid() | undefined) |
+ rabbit_types:ok({pid(), any()}) |
+ rabbit_types:error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link() ->
supervisor2:start_link({local, ?SERVER}, ?MODULE, []).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 6a018bd16d..086a90b49f 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -110,17 +110,13 @@ internal_check_user_login(Username, Fun) ->
Refused
end.
-check_vhost_access(#user{username = Username}, VHost) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_user_permission,
- #user_vhost{username = Username,
- virtual_host = VHost}}) of
- [] -> false;
- [_R] -> true
- end
- end).
+check_vhost_access(#user{username = Username}, VHostPath) ->
+ case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] -> false;
+ [_R] -> true
+ end.
check_resource_access(#user{username = Username},
#resource{virtual_host = VHostPath, name = Name},
@@ -150,6 +146,7 @@ permission_index(read) -> #permission.read.
%% Manipulation of the user database
add_user(Username, Password) ->
+ rabbit_log:info("Creating user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_user, Username}) of
@@ -165,10 +162,10 @@ add_user(Username, Password) ->
mnesia:abort({user_already_exists, Username})
end
end),
- rabbit_log:info("Created user ~p~n", [Username]),
R.
delete_user(Username) ->
+ rabbit_log:info("Deleting user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user(
Username,
@@ -185,13 +182,14 @@ delete_user(Username) ->
write)],
ok
end)),
- rabbit_log:info("Deleted user ~p~n", [Username]),
R.
change_password(Username, Password) ->
+ rabbit_log:info("Changing password for '~s'~n", [Username]),
change_password_hash(Username, hash_password(Password)).
clear_password(Username) ->
+ rabbit_log:info("Clearing password for '~s'~n", [Username]),
change_password_hash(Username, <<"">>).
change_password_hash(Username, PasswordHash) ->
@@ -199,7 +197,6 @@ change_password_hash(Username, PasswordHash) ->
User#internal_user{
password_hash = PasswordHash }
end),
- rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
hash_password(Cleartext) ->
@@ -221,11 +218,10 @@ salted_md5(Salt, Cleartext) ->
erlang:md5(Salted).
set_tags(Username, Tags) ->
+ rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]),
R = update_user(Username, fun(User) ->
User#internal_user{tags = Tags}
end),
- rabbit_log:info("Set user tags for user ~p to ~p~n",
- [Username, Tags]),
R.
update_user(Username, Fun) ->
@@ -255,6 +251,8 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n",
+ [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 22691ef9d3..095202dd11 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -27,8 +27,9 @@
-define(TIMEOUT_LIMIT, 100).
-define(RECORD_INDEX(Key, Record),
- proplists:get_value(Key, lists:zip(
- record_info(fields, Record), lists:seq(2, record_info(size, Record))))).
+ proplists:get_value(
+ Key, lists:zip(record_info(fields, Record),
+ lists:seq(2, record_info(size, Record))))).
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
@@ -36,25 +37,27 @@
-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
-record(state, {bqstate,
- len, %% int
- messages, %% queue of {msg_props, basic_msg}
- acks, %% dict of acktag => {msg_props, basic_msg}
- confirms}). %% set of msgid
+ len, %% int
+ next_seq_id, %% int
+ messages, %% gb_trees of seqid => {msg_props, basic_msg}
+ acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
+ confirms}). %% set of msgid
%% Initialise model
initial_state() ->
- #state{bqstate = qc_variable_queue_init(qc_test_queue()),
- len = 0,
- messages = queue:new(),
- acks = orddict:new(),
- confirms = gb_sets:new()}.
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ len = 0,
+ next_seq_id = 0,
+ messages = gb_trees:empty(),
+ acks = [],
+ confirms = gb_sets:new()}.
%% Property
prop_backing_queue_test() ->
?FORALL(Cmds, commands(?MODULE, initial_state()),
- backing_queue_test(Cmds)).
+ backing_queue_test(Cmds)).
backing_queue_test(Cmds) ->
{ok, FileSizeLimit} =
@@ -75,8 +78,8 @@ backing_queue_test(Cmds) ->
?BQMOD:delete_and_terminate(shutdown, BQ),
?WHENFAIL(
- io:format("Result: ~p~n", [Res]),
- aggregate(command_names(Cmds), Res =:= ok)).
+ io:format("Result: ~p~n", [Res]),
+ aggregate(command_names(Cmds), Res =:= ok)).
%% Commands
@@ -103,34 +106,34 @@ command(S) ->
qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
- [qc_message(),
- #message_properties{needs_confirming = frequency([{1, true},
- {20, false}]),
- expiry = oneof([undefined | lists:seq(1, 10)])},
- self(), BQ]}.
+ [qc_message(),
+ #message_properties{needs_confirming = frequency([{1, true},
+ {20, false}]),
+ expiry = oneof([undefined | lists:seq(1, 10)])},
+ self(), BQ]}.
qc_publish_multiple(#state{bqstate = BQ}) ->
{call, ?MODULE, publish_multiple,
- [qc_message(), #message_properties{}, BQ,
- resize(?QUEUE_MAXLEN, pos_integer())]}.
+ [qc_message(), #message_properties{}, BQ,
+ resize(?QUEUE_MAXLEN, pos_integer())]}.
qc_publish_delivered(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish_delivered,
- [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+ [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
+ {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, requeue,
- [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
- [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}.
+ [oneof([0, 1, 2, resize(1000, pos_integer()), infinity]), BQ]}.
qc_ram_duration(#state{bqstate = BQ}) ->
{call, ?BQMOD, ram_duration, [BQ]}.
@@ -153,11 +156,11 @@ qc_purge(#state{bqstate = BQ}) ->
%% Preconditions
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
- when Fun =:= ack; Fun =:= requeue ->
- orddict:size(Acks) > 0;
+ when Fun =:= ack; Fun =:= requeue ->
+ length(Acks) > 0;
precondition(#state{messages = Messages},
- {call, ?BQMOD, publish_delivered, _Arg}) ->
- queue:is_empty(Messages);
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ gb_trees:is_empty(Messages);
precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true;
precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
@@ -168,14 +171,18 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
- #state{len = Len, messages = Messages, confirms = Confirms} = S,
+ #state{len = Len,
+ messages = Messages,
+ confirms = Confirms,
+ next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
{call, erlang, element,
- [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ,
len = Len + 1,
- messages = queue:in({MsgProps, Msg}, Messages),
+ next_seq_id = NextSeq + 1,
+ messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
@@ -183,30 +190,33 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
#state{len = Len, messages = Messages} = S,
- Messages1 = repeat(Messages, fun(Msgs) ->
- queue:in({MsgProps, Msg}, Msgs)
- end, Count),
- S#state{bqstate = BQ,
- len = Len + Count,
- messages = Messages1};
+ {S1, Msgs1} = repeat({S, Messages},
+ fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
+ {State #state { next_seq_id = NextSeq + 1},
+ gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
+ end, Count),
+ S1#state{bqstate = BQ,
+ len = Len + Count,
+ messages = Msgs1};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
- [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
- #state{confirms = Confirms, acks = Acks} = S,
+ [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
{call, erlang, element,
- [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ1,
+ next_seq_id = NextSeq + 1,
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
acks = case AckReq of
- true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
+ true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
false -> Acks
end
};
@@ -217,34 +227,36 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
BQ1 = {call, erlang, element, [2, Res]},
AckTag = {call, erlang, element, [3, ResultInfo]},
S1 = S#state{bqstate = BQ1},
- case queue:out(Messages) of
- {empty, _M2} ->
- S1;
- {{value, MsgProp_Msg}, M2} ->
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
- false ->
- S2
- end
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
end;
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1,
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
- #state{len = Len, messages = Messages, acks = AcksState} = S,
+ #state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
- RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) ||
- Key <- AcksArg]),
+ Messages1 = lists:foldl(fun (AckTag, Msgs) ->
+ {SeqId, MsgPropsMsg} =
+ proplists:get_value(AckTag, AcksState),
+ gb_trees:insert(SeqId, MsgPropsMsg, Msgs)
+ end, Messages, AcksArg),
S#state{bqstate = BQ1,
- len = Len + length(RequeueMsgs),
- messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ len = gb_trees:size(Messages1),
+ messages = Messages1,
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
S#state{bqstate = BQ};
@@ -259,8 +271,8 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
#state{messages = Messages} = S,
- Messages1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1};
+ Msgs1 = drop_messages(Messages),
+ S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
@@ -270,7 +282,7 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = queue:new()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
%% Postconditions
@@ -278,49 +290,44 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
{{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
- {_MsgProps, Msg} = queue:head(Messages),
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
- not orddict:is_key(AckTag, Acks) andalso
- not gb_sets:is_element(AckTag, Confrms) andalso
- RemainingLen =:= Len - 1;
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms) andalso
+ RemainingLen =:= Len - 1;
{empty, _BQ} ->
Len =:= 0
end;
postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
#state{acks = Acks, confirms = Confrms} = S,
- not orddict:is_key(AckTag, Acks) andalso
- not gb_sets:is_element(AckTag, Confrms);
+ not proplists:is_defined(AckTag, Acks) andalso
+ not gb_sets:is_element(AckTag, Confrms);
postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
{PurgeCount, _BQ} = Res,
Len =:= PurgeCount;
-postcondition(#state{len = Len},
- {call, ?BQMOD, is_empty, _Args}, Res) ->
+postcondition(#state{len = Len}, {call, ?BQMOD, is_empty, _Args}, Res) ->
(Len =:= 0) =:= Res;
postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
#state{confirms = Confirms} = S,
{ReportedConfirmed, _BQ} = Res,
- lists:all(fun (M) ->
- gb_sets:is_element(M, Confirms)
- end, ReportedConfirmed);
+ lists:all(fun (M) -> gb_sets:is_element(M, Confirms) end,
+ ReportedConfirmed);
postcondition(#state{bqstate = BQ, len = Len}, {call, _M, _F, _A}, _Res) ->
?BQMOD:len(BQ) =:= Len.
%% Helpers
-repeat(Result, _Fun, 0) ->
- Result;
-repeat(Result, Fun, Times) ->
- repeat(Fun(Result), Fun, Times - 1).
+repeat(Result, _Fun, 0) -> Result;
+repeat(Result, Fun, Times) -> repeat(Fun(Result), Fun, Times - 1).
publish_multiple(Msg, MsgProps, BQ, Count) ->
- repeat(BQ, fun(BQ1) ->
- ?BQMOD:publish(Msg, MsgProps, self(), BQ1)
- end, Count).
+ repeat(BQ, fun(BQ1) -> ?BQMOD:publish(Msg, MsgProps, self(), BQ1) end,
+ Count).
timeout(BQ, 0) ->
BQ;
@@ -330,37 +337,30 @@ timeout(BQ, AtMost) ->
_ -> timeout(?BQMOD:timeout(BQ), AtMost - 1)
end.
-qc_message_payload() ->
- ?SIZED(Size, resize(Size * Size, binary())).
+qc_message_payload() -> ?SIZED(Size, resize(Size * Size, binary())).
-qc_routing_key() ->
- noshrink(binary(10)).
+qc_routing_key() -> noshrink(binary(10)).
-qc_delivery_mode() ->
- oneof([1, 2]).
+qc_delivery_mode() -> oneof([1, 2]).
-qc_message() ->
- qc_message(qc_delivery_mode()).
+qc_message() -> qc_message(qc_delivery_mode()).
qc_message(DeliveryMode) ->
- {call, rabbit_basic, message, [
- qc_default_exchange(),
- qc_routing_key(),
- #'P_basic'{delivery_mode = DeliveryMode},
- qc_message_payload()]}.
+ {call, rabbit_basic, message, [qc_default_exchange(),
+ qc_routing_key(),
+ #'P_basic'{delivery_mode = DeliveryMode},
+ qc_message_payload()]}.
qc_default_exchange() ->
{call, rabbit_misc, r, [<<>>, exchange, <<>>]}.
qc_variable_queue_init(Q) ->
{call, ?BQMOD, init,
- [Q, false, function(2, ok)]}.
+ [Q, false, function(2, ok)]}.
-qc_test_q() ->
- {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
+qc_test_q() -> {call, rabbit_misc, r, [<<"/">>, queue, noshrink(binary(16))]}.
-qc_test_queue() ->
- qc_test_queue(boolean()).
+qc_test_queue() -> qc_test_queue(boolean()).
qc_test_queue(Durable) ->
#amqqueue{name = qc_test_q(),
@@ -370,18 +370,25 @@ qc_test_queue(Durable) ->
pid = self()}.
rand_choice([]) -> [];
-rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
+rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))).
+
+rand_choice(_List, Selection, 0) ->
+ Selection;
+rand_choice(List, Selection, N) ->
+ Picked = lists:nth(random:uniform(length(List)), List),
+ rand_choice(List -- [Picked], [Picked | Selection],
+ N - 1).
dropfun(Props) ->
Expiry = eval({call, erlang, element,
- [?RECORD_INDEX(expiry, message_properties), Props]}),
+ [?RECORD_INDEX(expiry, message_properties), Props]}),
Expiry =/= 1.
drop_messages(Messages) ->
- case queue:out(Messages) of
- {empty, _} ->
+ case gb_trees:is_empty(Messages) of
+ true ->
Messages;
- {{value, MsgProps_Msg}, M2} ->
+ false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages),
MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
case dropfun(MsgProps) of
true -> drop_messages(M2);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9cc406e718..b266d3664d 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/4]).
--export([publish/4, publish/6]).
+-export([publish/4, publish/6, publish/1,
+ message/3, message/4, properties/1, delivery/4]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -35,6 +35,12 @@
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
+-spec(publish/4 ::
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) -> publish_result()).
+-spec(publish/6 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+ properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
@@ -49,12 +55,6 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(publish/4 ::
- (exchange_input(), rabbit_router:routing_key(), properties_input(),
- body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -64,13 +64,34 @@
%%----------------------------------------------------------------------------
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined));
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined)).
+
publish(Delivery = #delivery{
- message = #basic_message{exchange_name = ExchangeName}}) ->
- case rabbit_exchange:lookup(ExchangeName) of
+ message = #basic_message{exchange_name = XName}}) ->
+ case rabbit_exchange:lookup(XName) of
{ok, X} -> publish(X, Delivery);
- Other -> Other
+ Err -> Err
end.
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
@@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
-message(ExchangeName, RoutingKey,
- #content{properties = Props} = DecodedContent) ->
+message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
+ exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent),
@@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, Body) ->
+message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
- {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ {ok, Msg} = message(XName, RoutingKey, Content),
Msg.
properties(P = #'P_basic'{}) ->
@@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0;
indexof([Element | _Rest], Element, N) -> N;
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
- Err -> Err
- end.
-
-publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
- {ok, RoutingRes, DeliveredQPids}.
-
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 68511a326c..494f3203c8 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -122,10 +122,9 @@ build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
-create_frame(TypeInt, ChannelInt, PayloadBin) when is_binary(PayloadBin) ->
- [<<TypeInt:8, ChannelInt:16, (size(PayloadBin)):32>>, PayloadBin, <<?FRAME_END>>];
create_frame(TypeInt, ChannelInt, Payload) ->
- create_frame(TypeInt, ChannelInt, list_to_binary(Payload)).
+ [<<TypeInt:8, ChannelInt:16, (iolist_size(Payload)):32>>, Payload,
+ ?FRAME_END].
%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S,
%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x,
@@ -194,13 +193,13 @@ generate_array(Array) when is_list(Array) ->
short_string_to_binary(String) when is_binary(String) ->
Len = size(String),
- if Len < 256 -> [<<(size(String)):8>>, String];
+ if Len < 256 -> [<<Len:8>>, String];
true -> exit(content_properties_shortstr_overflow)
end;
short_string_to_binary(String) ->
- StringLength = length(String),
- if StringLength < 256 -> [<<StringLength:8>>, String];
- true -> exit(content_properties_shortstr_overflow)
+ Len = length(String),
+ if Len < 256 -> [<<Len:8>>, String];
+ true -> exit(content_properties_shortstr_overflow)
end.
long_string_to_binary(String) when is_binary(String) ->
@@ -240,11 +239,11 @@ encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, Flags
encode_property(shortstr, String) ->
Len = size(String),
- if Len < 256 -> <<Len:8/unsigned, String:Len/binary>>;
+ if Len < 256 -> <<Len:8, String:Len/binary>>;
true -> exit(content_properties_shortstr_overflow)
end;
encode_property(longstr, String) ->
- Len = size(String), <<Len:32/unsigned, String:Len/binary>>;
+ Len = size(String), <<Len:32, String:Len/binary>>;
encode_property(octet, Int) ->
<<Int:8/unsigned>>;
encode_property(shortint, Int) ->
@@ -261,7 +260,7 @@ encode_property(table, Table) ->
check_empty_content_body_frame_size() ->
%% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is
%% defined correctly.
- ComputedSize = size(list_to_binary(create_frame(?FRAME_BODY, 0, <<>>))),
+ ComputedSize = iolist_size(create_frame(?FRAME_BODY, 0, <<>>)),
if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE ->
ok;
true ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 205d5bbabd..e625a427e4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -40,7 +40,7 @@
'source_and_destination_not_found')).
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error('binding_not_found')).
--type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())).
+-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
@@ -108,21 +108,34 @@ recover(XNames, QNames) ->
SelectSet = fun (#resource{kind = exchange}) -> XNameSet;
(#resource{kind = queue}) -> QNameSet
end,
- [recover_semi_durable_route(R, SelectSet(Dst)) ||
+ {ok, Gatherer} = gatherer:start_link(),
+ [recover_semi_durable_route(Gatherer, R, SelectSet(Dst)) ||
R = #route{binding = #binding{destination = Dst}} <-
rabbit_misc:dirty_read_all(rabbit_semi_durable_route)],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer),
ok.
-recover_semi_durable_route(R = #route{binding = B}, ToRecover) ->
+recover_semi_durable_route(Gatherer, R = #route{binding = B}, ToRecover) ->
#binding{source = Src, destination = Dst} = B,
- {ok, X} = rabbit_exchange:lookup(Src),
+ case sets:is_element(Dst, ToRecover) of
+ true -> {ok, X} = rabbit_exchange:lookup(Src),
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ recover_semi_durable_route_txn(R, X),
+ gatherer:finish(Gatherer)
+ end);
+ false -> ok
+ end.
+
+recover_semi_durable_route_txn(R = #route{binding = B}, X) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- Rs = mnesia:match_object(rabbit_semi_durable_route, R, read),
- case Rs =/= [] andalso sets:is_element(Dst, ToRecover) of
- false -> no_recover;
- true -> ok = sync_transient_route(R, fun mnesia:write/3),
- rabbit_exchange:serial(X)
+ case mnesia:match_object(rabbit_semi_durable_route, R, read) of
+ [] -> no_recover;
+ _ -> ok = sync_transient_route(R, fun mnesia:write/3),
+ rabbit_exchange:serial(X)
end
end,
fun (no_recover, _) -> ok;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dfe84644e4..3c61447af2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,8 +35,8 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
- user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ user, virtual_host, most_recently_declared_queue, queue_monitors,
+ consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -189,9 +189,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_monitors = dict:new(),
consumer_mapping = dict:new(),
- blocking = dict:new(),
- consumer_monitors = dict:new(),
+ blocking = sets:new(),
+ queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -275,7 +276,7 @@ handle_cast(terminate, State) ->
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- noreply(monitor_consumer(ConsumerTag, State));
+ noreply(consumer_monitor(ConsumerTag, State));
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
@@ -299,13 +300,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
+ noreply(State3#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -323,15 +324,13 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
noreply([ensure_stats_timer],
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_info({'DOWN', MRef, process, QPid, Reason},
- State = #ch{consumer_monitors = ConsumerMonitors}) ->
- noreply(
- case dict:find(MRef, ConsumerMonitors) of
- error ->
- handle_publishing_queue_down(QPid, Reason, State);
- {ok, ConsumerTag} ->
- handle_consuming_queue_down(MRef, ConsumerTag, State)
- end);
+handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
+ State1 = handle_publishing_queue_down(QPid, Reason, State),
+ State2 = queue_blocked(QPid, State1),
+ State3 = handle_consuming_queue_down(QPid, State2),
+ erase_queue_stats(QPid),
+ noreply(State3#ch{queue_monitors =
+ dict:erase(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -375,15 +374,20 @@ noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
noreply(Mask, NewState, Timeout) ->
{noreply, next_state(Mask, NewState), Timeout}.
+-define(MASKED_CALL(Fun, Mask, State),
+ case lists:member(Fun, Mask) of
+ true -> State;
+ false -> Fun(State)
+ end).
+
next_state(Mask, State) ->
- lists:foldl(fun (ensure_stats_timer, State1) -> ensure_stats_timer(State1);
- (send_confirms, State1) -> send_confirms(State1)
- end, State, [ensure_stats_timer, send_confirms] -- Mask).
+ State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
+ State2 = ?MASKED_CALL(send_confirms, Mask, State1),
+ State2.
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, ChPid, emit_stats)}.
+ StatsTimer, self(), emit_stats)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -516,17 +520,16 @@ check_name(_Kind, NameBin) ->
NameBin.
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case dict:find(QPid, Blocking) of
- error -> State;
- {ok, MRef} -> true = erlang:demonitor(MRef),
- Blocking1 = dict:erase(QPid, Blocking),
- ok = case dict:size(Blocking1) of
- 0 -> rabbit_writer:send_command(
- State#ch.writer_pid,
- #'channel.flow_ok'{active = false});
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ case sets:is_element(QPid, Blocking) of
+ false -> State;
+ true -> Blocking1 = sets:del_element(QPid, Blocking),
+ ok = case sets:size(Blocking1) of
+ 0 -> rabbit_writer:send_command(
+ State#ch.writer_pid,
+ #'channel.flow_ok'{active = false});
+ _ -> ok
+ end,
+ demonitor_queue(QPid, State#ch{blocking = Blocking1})
end.
record_confirm(undefined, _, State) ->
@@ -545,38 +548,41 @@ confirm(MsgSeqNos, QPid, State) ->
{MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
record_confirms(MXs, State1).
-process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
- {MXs, UMQ1, UQM1} =
- lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], UMQ, UQM}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
- UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
- end;
- none ->
- UQM
- end,
+process_confirms(MsgSeqNos, QPid, Nack, State) ->
+ lists:foldl(
+ fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack);
+ none -> Acc
+ end
+ end, {[], State}, MsgSeqNos).
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
+ {MXs, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}},
+ Nack) ->
+ State1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> UQM1 = gb_trees:delete(QPid, UQM),
+ demonitor_queue(
+ QPid, State#ch{unconfirmed_qm = UQM1});
+ false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State#ch{unconfirmed_qm = UQM1}
+ end;
+ none ->
+ State
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
- true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
- false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
+ {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
+ false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
+ {MXs, State1#ch{unconfirmed_mq = UMQ1}}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -693,11 +699,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
- maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -707,7 +713,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State3#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -746,12 +752,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q} ->
State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- {Q, undefined},
+ dict:store(ActualConsumerTag, Q,
ConsumerMapping)},
{noreply,
case NoWait of
- true -> monitor_consumer(ActualConsumerTag, State1);
+ true -> consumer_monitor(ActualConsumerTag, State1);
false -> State1
end};
{{error, exclusive_consume_unavailable}, _Q} ->
@@ -768,22 +773,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
_, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors}) ->
+ queue_consumers = QCons}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {Q, MRef}} ->
- ConsumerMonitors1 =
- case MRef of
- undefined -> ConsumerMonitors;
- _ -> true = erlang:demonitor(MRef),
- dict:erase(MRef, ConsumerMonitors)
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ QCons1 =
+ case dict:find(QPid, QCons) of
+ error -> QCons;
+ {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
+ case gb_sets:is_empty(CTags1) of
+ true -> dict:erase(QPid, QCons);
+ false -> dict:store(QPid, CTags1, QCons)
+ end
end,
- NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
- ConsumerMapping),
- consumer_monitors = ConsumerMonitors1},
+ NewState = demonitor_queue(
+ Q, State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1}),
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -832,14 +841,10 @@ handle_method(#'basic.recover_async'{requeue = true},
OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
- %% The Qpid python test suite incorrectly assumes
- %% that messages will be requeued in their original
- %% order. To keep it happy we reverse the id list
- %% since we are given them in reverse order.
rabbit_misc:with_exit_handler(
OkFun, fun () ->
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), self())
+ QPid, MsgIds, self())
end)
end, ok, UAMQ),
ok = notify_limiter(Limiter, UAMQ),
@@ -1108,10 +1113,12 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
+ QPids -> State2 = lists:foldl(fun monitor_queue/2,
+ State1#ch{blocking =
+ sets:from_list(QPids)},
+ QPids),
ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ {noreply, State2}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1120,23 +1127,51 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- capabilities = Capabilities}) ->
+consumer_monitor(ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- {#amqqueue{pid = QPid} = Q, undefined} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- MRef = erlang:monitor(process, QPid),
- State#ch{consumer_mapping =
- dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
- consumer_monitors =
- dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ QCons1 = dict:update(QPid,
+ fun (CTags) ->
+ gb_sets:insert(ConsumerTag, CTags)
+ end,
+ gb_sets:singleton(ConsumerTag),
+ QCons),
+ monitor_queue(QPid, State#ch{queue_consumers = QCons1});
_ ->
State
end.
+monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (not dict:is_key(QPid, QMons) andalso
+ queue_monitor_needed(QPid, State)) of
+ true -> MRef = erlang:monitor(process, QPid),
+ State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
+ false -> State
+ end.
+
+demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
+ case (dict:is_key(QPid, QMons) andalso
+ not queue_monitor_needed(QPid, State)) of
+ true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
+ State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ false -> State
+ end.
+
+queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
+ queue_consumers = QCons,
+ blocking = Blocking,
+ unconfirmed_qm = UQM}) ->
+ StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ ConsumerMonitored = dict:is_key(QPid, QCons),
+ QueueBlocked = sets:is_element(QPid, Blocking),
+ ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
+ StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1157,21 +1192,25 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
{true, fun send_nacks/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- erase_queue_stats(QPid),
- State3 = SendFun(MXs, State2),
- queue_blocked(QPid, State3).
-
-handle_consuming_queue_down(MRef, ConsumerTag,
- State = #ch{consumer_mapping = ConsumerMapping,
- consumer_monitors = ConsumerMonitors,
- writer_pid = WriterPid}) ->
- ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
- Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
- nowait = true},
- ok = rabbit_writer:send_command(WriterPid, Cancel),
+ SendFun(MXs, State2).
+
+handle_consuming_queue_down(QPid,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ queue_consumers = QCons,
+ writer_pid = WriterPid}) ->
+ ConsumerTags = case dict:find(QPid, QCons) of
+ error -> gb_sets:new();
+ {ok, CTags} -> CTags
+ end,
+ ConsumerMapping1 =
+ gb_sets:fold(fun (CTag, CMap) ->
+ Cancel = #'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ dict:erase(CTag, CMap)
+ end, ConsumerMapping, ConsumerTags),
State#ch{consumer_mapping = ConsumerMapping1,
- consumer_monitors = ConsumerMonitors1}.
+ queue_consumers = dict:erase(QPid, QCons)}.
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
@@ -1271,9 +1310,8 @@ ack(Acked, State) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
[{QPid, length(MsgIds)} | L]
end, [], Acked),
- maybe_incr_stats(QIncs, ack, State),
ok = notify_limiter(State#ch.limiter, Acked),
- State.
+ maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
uncommitted_ack_q = queue:new()}.
@@ -1285,16 +1323,11 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
State#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
- D = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, D) ->
- %% dict:append would avoid the lists:reverse in
- %% handle_message({recover, true}, ...). However, it
- %% is significantly slower when going beyond a few
- %% thousand elements.
- rabbit_misc:dict_cons(QPid, MsgId, D)
- end, dict:new(), UAQ),
- dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
- Acc0, D).
+ T = rabbit_misc:queue_fold(
+ fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAQ),
+ rabbit_misc:gb_trees_fold(F, Acc0, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1307,8 +1340,7 @@ limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
consumer_queues(Consumers) ->
lists:usort([QPid ||
- {_Key, {#amqqueue{pid = QPid}, _MRef}}
- <- dict:to_list(Consumers)]).
+ {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -1334,38 +1366,37 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
XName, MsgSeqNo, Message, State),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1),
- State1.
+ QPid <- DeliveredQPids]], publish, State1).
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State));
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
+ record_confirm(MsgSeqNo, XName,
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State));
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ #ch{unconfirmed_mq = UMQ} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(
- fun (QPid, UQM2) ->
- maybe_monitor(QPid),
- case gb_trees:lookup(QPid, UQM2) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, MsgSeqNos1, UQM2);
- none ->
- gb_trees:insert(QPid, SingletonSet, UQM2)
- end
- end, UQM, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
+ lists:foldl(
+ fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
+ State0#ch{unconfirmed_qm = UQM1};
+ none ->
+ UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
+ monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ end
+ end, State#ch{unconfirmed_mq = UMQ1}, QPids).
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1384,12 +1415,16 @@ send_nacks(MXs, State = #ch{tx_status = none}) ->
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
+send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+ State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- C1 = lists:append(C),
- MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
- MsgSeqNo
- end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []});
+ {MsgSeqNos, State1} =
+ lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
+ {[MsgSeqNo | MSNs],
+ maybe_incr_stats([{ExchangeName, 1}], confirm,
+ State0)}
+ end, {[], State}, lists:append(C)),
+ send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1469,30 +1504,26 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, _) ->
- ok.
+maybe_incr_redeliver_stats(_, _, State) ->
+ State.
-maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
+maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
- fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
- _ -> ok
+ fine -> lists:foldl(fun ({QX, Inc}, State0) ->
+ incr_stats(QX, Inc, Measure, State0)
+ end, State, QXIncs);
+ _ -> State
end.
-incr_stats({QPid, _} = QX, Inc, Measure) ->
- maybe_monitor(QPid),
- update_measures(queue_exchange_stats, QX, Inc, Measure);
-incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
- maybe_monitor(QPid),
- update_measures(queue_stats, QPid, Inc, Measure);
-incr_stats(X, Inc, Measure) ->
- update_measures(exchange_stats, X, Inc, Measure).
-
-maybe_monitor(QPid) ->
- case get({monitoring, QPid}) of
- undefined -> erlang:monitor(process, QPid),
- put({monitoring, QPid}, true);
- _ -> ok
- end.
+incr_stats({QPid, _} = QX, Inc, Measure, State) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure),
+ monitor_queue(QPid, State);
+incr_stats(X, Inc, Measure, State) ->
+ update_measures(exchange_stats, X, Inc, Measure),
+ State.
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
@@ -1528,7 +1559,6 @@ emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
end.
erase_queue_stats(QPid) ->
- erase({monitoring, QPid}),
erase({queue_stats, QPid}),
[erase({queue_exchange_stats, QX}) ||
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 15e92542a2..dfb400e37f 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,7 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) ->
- rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
-spec(start_link/2 :: ({'local', atom()}, mfa()) ->
rabbit_types:ok_pid_or_error()).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index 07036ce8d0..a0953eab95 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -22,8 +22,12 @@
%%----------------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+
-ifdef(use_specs).
+-export_type([frame/0]).
+
-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
?FRAME_TRACE | ?FRAME_HEARTBEAT).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 93aad9e354..6e29ace71a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -27,6 +27,16 @@
-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
handle_info/2]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(boot/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
boot() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
ok = error_logger:add_report_handler(?MODULE, [DefaultVHost]).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7e9ebc4fa2..7b6e07c19f 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -26,11 +26,16 @@
%% with the result of closing the old handler when swapping handlers.
%% The first init/1 additionally allows for simple log rotation
%% when the suffix is not the empty string.
+%% The original init/2 also opened the file in 'write' mode, thus
+%% overwriting old logs. To remedy this, init/2 from
+%% lib/stdlib/src/error_logger_file_h.erl from R14B3 was copied as
+%% init_file/2 and changed so that it opens the file in 'append' mode.
%% Used only when swapping handlers in log rotation
init({{File, Suffix}, []}) ->
- case rabbit_misc:append_file(File, Suffix) of
- ok -> ok;
+ case rabbit_file:append_file(File, Suffix) of
+ ok -> file:delete(File),
+ ok;
{error, Error} ->
rabbit_log:error("Failed to append contents of "
"log file '~s' to '~s':~n~p~n",
@@ -45,12 +50,31 @@ init({{File, _}, error}) ->
%% log rotation
init({File, []}) ->
init(File);
-init({File, _Type} = FileInfo) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- error_logger_file_h:init(FileInfo);
+%% Used only when taking over from the tty handler
+init({{File, []}, _}) ->
+ init(File);
+init({File, {error_logger, Buf}}) ->
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(File, {error_logger, Buf});
init(File) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- error_logger_file_h:init(File).
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(File, []).
+
+init_file(File, {error_logger, Buf}) ->
+ case init_file(File, error_logger) of
+ {ok, {Fd, File, PrevHandler}} ->
+ [handle_event(Event, {Fd, File, PrevHandler}) ||
+ {_, Event} <- lists:reverse(Buf)],
+ {ok, {Fd, File, PrevHandler}};
+ Error ->
+ Error
+ end;
+init_file(File, PrevHandler) ->
+ process_flag(trap_exit, true),
+ case file:open(File, [append]) of
+ {ok,Fd} -> {ok, {Fd, File, PrevHandler}};
+ Error -> Error
+ end.
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
new file mode 100644
index 0000000000..5cb8e7b69d
--- /dev/null
+++ b/src/rabbit_file.erl
@@ -0,0 +1,282 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_file).
+
+-include_lib("kernel/include/file.hrl").
+
+-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]).
+-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
+-export([append_file/2, ensure_parent_dirs_exist/1]).
+-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
+-export([lock_file/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+
+-spec(is_file/1 :: ((file:filename())) -> boolean()).
+-spec(is_dir/1 :: ((file:filename())) -> boolean()).
+-spec(file_size/1 :: ((file:filename())) -> non_neg_integer()).
+-spec(ensure_dir/1 :: ((file:filename())) -> ok_or_error()).
+-spec(wildcard/2 :: (string(), file:filename()) -> [file:filename()]).
+-spec(list_dir/1 :: (file:filename()) -> rabbit_types:ok_or_error2(
+ [file:filename()], any())).
+-spec(read_term_file/1 ::
+ (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
+-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
+-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
+-spec(rename/2 ::
+ (file:filename(), file:filename()) -> ok_or_error()).
+-spec(delete/1 :: ([file:filename()]) -> ok_or_error()).
+-spec(recursive_delete/1 ::
+ ([file:filename()])
+ -> rabbit_types:ok_or_error({file:filename(), any()})).
+-spec(recursive_copy/2 ::
+ (file:filename(), file:filename())
+ -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
+-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+is_file(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{type=regular}} -> true;
+ {ok, #file_info{type=directory}} -> true;
+ _ -> false
+ end.
+
+is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
+
+is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
+
+is_dir_internal({ok, #file_info{type=directory}}) -> true;
+is_dir_internal(_) -> false.
+
+file_size(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{size=Size}} -> Size;
+ _ -> 0
+ end.
+
+ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end).
+
+ensure_dir_internal("/") ->
+ ok;
+ensure_dir_internal(File) ->
+ Dir = filename:dirname(File),
+ case is_dir_no_handle(Dir) of
+ true -> ok;
+ false -> ensure_dir_internal(Dir),
+ prim_file:make_dir(Dir)
+ end.
+
+wildcard(Pattern, Dir) ->
+ {ok, Files} = list_dir(Dir),
+ {ok, RE} = re:compile(Pattern, [anchored]),
+ [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])].
+
+list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
+
+read_file_info(File) ->
+ with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
+
+with_fhc_handle(Fun) ->
+ ok = file_handle_cache:obtain(),
+ try Fun()
+ after ok = file_handle_cache:release()
+ end.
+
+read_term_file(File) ->
+ try
+ {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ {ok, [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups]}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
+group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)].
+
+group_tokens([], []) -> [];
+group_tokens(Cur, []) -> [Cur];
+group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
+group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
+
+write_term_file(File, Terms) ->
+ write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
+write_file(Path, Data) -> write_file(Path, Data, []).
+
+%% write_file/3 and make_binary/1 are both based on corresponding
+%% functions in the kernel/file.erl module of the Erlang R14B02
+%% release, which is licensed under the EPL. That implementation of
+%% write_file/3 does not do an fsync prior to closing the file, hence
+%% the existence of this version. APIs are otherwise identical.
+write_file(Path, Data, Modes) ->
+ Modes1 = [binary, write | (Modes -- [binary, write])],
+ case make_binary(Data) of
+ Bin when is_binary(Bin) ->
+ with_fhc_handle(
+ fun () -> case prim_file:open(Path, Modes1) of
+ {ok, Hdl} -> try prim_file:write(Hdl, Bin) of
+ ok -> prim_file:sync(Hdl);
+ {error, _} = E -> E
+ after
+ prim_file:close(Hdl)
+ end;
+ {error, _} = E -> E
+ end
+ end);
+ {error, _} = E -> E
+ end.
+
+make_binary(Bin) when is_binary(Bin) ->
+ Bin;
+make_binary(List) ->
+ try
+ iolist_to_binary(List)
+ catch error:Reason ->
+ {error, Reason}
+ end.
+
+
+append_file(File, Suffix) ->
+ case read_file_info(File) of
+ {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
+ {error, enoent} -> append_file(File, 0, Suffix);
+ Error -> Error
+ end.
+
+append_file(_, _, "") ->
+ ok;
+append_file(File, 0, Suffix) ->
+ with_fhc_handle(fun () ->
+ case prim_file:open([File, Suffix], [append]) of
+ {ok, Fd} -> prim_file:close(Fd);
+ Error -> Error
+ end
+ end);
+append_file(File, _, Suffix) ->
+ case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
+ {ok, Data} -> write_file([File, Suffix], Data, [append]);
+ Error -> Error
+ end.
+
+ensure_parent_dirs_exist(Filename) ->
+ case ensure_dir(Filename) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_parent_dirs, Filename, Reason}})
+ end.
+
+rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end).
+
+delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end).
+
+recursive_delete(Files) ->
+ with_fhc_handle(
+ fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files)
+ end).
+
+recursive_delete1(Path) ->
+ case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of
+ false -> case prim_file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case prim_file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case prim_file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+is_symlink_no_handle(File) ->
+ case prim_file:read_link(File) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+recursive_copy(Src, Dest) ->
+ %% Note that this uses the 'file' module and, hence, shouldn't be
+ %% run on many processes at once.
+ case is_dir(Src) of
+ false -> case file:copy(Src, Dest) of
+ {ok, _Bytes} -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Src, Dest, Err}}
+ end;
+ true -> case file:list_dir(Src) of
+ {ok, FileNames} ->
+ case file:make_dir(Dest) of
+ ok ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_copy(
+ filename:join(Src, FileName),
+ filename:join(Dest, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end;
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end
+ end.
+
+%% TODO: When we stop supporting Erlang prior to R14, this should be
+%% replaced with file:open [write, exclusive]
+lock_file(Path) ->
+ case is_file(Path) of
+ true -> {error, eexist};
+ false -> with_fhc_handle(
+ fun () -> {ok, Lock} = prim_file:open(Path, [write]),
+ ok = prim_file:close(Lock)
+ end)
+ end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 234bc55be5..cf3fea1a53 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -52,13 +52,13 @@ start_link() ->
update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
- Serial = case rabbit_misc:read_term_file(Filename) of
+ Serial = case rabbit_file:read_term_file(Filename) of
{ok, [Num]} -> Num;
{error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
- case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ case rabbit_file:write_term_file(Filename, [Serial + 1]) of
ok -> ok;
{error, Reason1} ->
throw({error, {cannot_write_serial_file, Filename, Reason1}})
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 24468a01fe..8a08d4b673 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -251,6 +251,7 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
+ 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8207d6bc65..558e095751 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -42,6 +42,8 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
+-spec(message/4 :: (_,_,_,_) -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ad5fd28f83..5fc6341f50 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -59,6 +59,10 @@
known_senders :: set()
}).
+-type(ack() :: non_neg_integer()).
+-type(state() :: master_state()).
+-include("rabbit_backing_queue_spec.hrl").
+
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index cf8e9484f4..baebc52b27 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,6 +22,26 @@
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(remove_from_queue/2 ::
+ (rabbit_amqqueue:name(), [pid()])
+ -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
+-spec(on_node_up/0 :: () -> 'ok').
+-spec(drop_mirror/2 ::
+ (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirror/2 ::
+ (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirror/3 ::
+ (rabbit_types:vhost(), binary(), atom())
+ -> rabbit_types:ok_or_error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
%% If the dead pids include the queue pid (i.e. the master has died)
%% then only remove that if we are about to be promoted. Otherwise we
%% can have the situation where a slave updates the mnesia record for
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3c45398133..f423760a3d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -45,8 +45,19 @@
-behaviour(gm).
-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
-include("gm_specs.hrl").
+-ifdef(use_specs).
+%% Shut dialyzer up
+-spec(promote_me/2 :: (_, _) -> no_return()).
+-endif.
+
+%%----------------------------------------------------------------------------
+
+
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -379,9 +390,9 @@ needs_confirming(_Delivery, _State) ->
immediately.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
- {MS1, CMs} =
+ {CMs, MS1} =
lists:foldl(
- fun (MsgId, {MSN, CMsN} = Acc) ->
+ fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
case dict:find(MsgId, MSN) of
error ->
@@ -391,12 +402,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{ok, {published, ChPid}} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN};
+ {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
- {dict:erase(MsgId, MSN),
- gb_trees_cons(ChPid, MsgSeqNo, CMsN)};
+ {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
+ dict:erase(MsgId, MSN)};
{ok, {confirmed, _ChPid}} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
@@ -405,17 +416,10 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
%% channel. Nothing to do here.
Acc
end
- end, {MS, gb_trees:empty()}, MsgIds),
- [ok = rabbit_channel:confirm(ChPid, MsgSeqNos)
- || {ChPid, MsgSeqNos} <- gb_trees:to_list(CMs)],
+ end, {gb_trees:empty(), MS}, MsgIds),
+ rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs),
State #state { msg_id_status = MS1 }.
-gb_trees_cons(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
- none -> gb_trees:insert(Key, [Value], Tree)
- end.
-
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ae28722ab2..13a553f124 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -18,8 +18,6 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--include_lib("kernel/include/file.hrl").
-
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
@@ -40,19 +38,17 @@
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
--export([append_file/2, ensure_parent_dirs_exist/1]).
--export([format_stderr/2, with_local_io/1]).
+-export([format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
+-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
--export([lock_file/1]).
-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -158,15 +154,9 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
--spec(read_term_file/1 ::
- (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
--spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
--spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
--spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
--spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
+-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
@@ -180,14 +170,12 @@
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
-> boolean()).
--spec(recursive_delete/1 ::
- ([file:filename()])
- -> rabbit_types:ok_or_error({file:filename(), any()})).
--spec(recursive_copy/2 ::
- (file:filename(), file:filename())
- -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
+-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
+-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
+-spec(gb_trees_foreach/2 ::
+ (fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -199,7 +187,6 @@
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
--spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
@@ -270,8 +257,15 @@ val({Type, Value}) ->
end,
lists:flatten(io_lib:format(Fmt, [Value, Type])).
-dirty_read(ReadSpec) ->
- case mnesia:dirty_read(ReadSpec) of
+%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+%% expensive due to general mnesia overheads (figuring out table types
+%% and locations, etc). We get away with bypassing these because we
+%% know that the tables we are looking at here
+%% - are not the schema table
+%% - have a local ram copy
+%% - do not have any indices
+dirty_read({Table, Key}) ->
+ case ets:lookup(Table, Key) of
[Result] -> {ok, Result};
[] -> {error, not_found}
end.
@@ -525,74 +519,6 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
-
-read_term_file(File) -> file:consult(File).
-
-write_term_file(File, Terms) ->
- write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
- Term <- Terms])).
-
-write_file(Path, Data) ->
- write_file(Path, Data, []).
-
-%% write_file/3 and make_binary/1 are both based on corresponding
-%% functions in the kernel/file.erl module of the Erlang R14B02
-%% release, which is licensed under the EPL. That implementation of
-%% write_file/3 does not do an fsync prior to closing the file, hence
-%% the existence of this version. APIs are otherwise identical.
-write_file(Path, Data, Modes) ->
- Modes1 = [binary, write | (Modes -- [binary, write])],
- case make_binary(Data) of
- Bin when is_binary(Bin) ->
- case file:open(Path, Modes1) of
- {ok, Hdl} -> try file:write(Hdl, Bin) of
- ok -> file:sync(Hdl);
- {error, _} = E -> E
- after
- file:close(Hdl)
- end;
- {error, _} = E -> E
- end;
- {error, _} = E -> E
- end.
-
-make_binary(Bin) when is_binary(Bin) ->
- Bin;
-make_binary(List) ->
- try
- iolist_to_binary(List)
- catch error:Reason ->
- {error, Reason}
- end.
-
-
-append_file(File, Suffix) ->
- case file:read_file_info(File) of
- {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
- {error, enoent} -> append_file(File, 0, Suffix);
- Error -> Error
- end.
-
-append_file(_, _, "") ->
- ok;
-append_file(File, 0, Suffix) ->
- case file:open([File, Suffix], [append]) of
- {ok, Fd} -> file:close(Fd);
- Error -> Error
- end;
-append_file(File, _, Suffix) ->
- case file:read_file(File) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
- end.
-
-ensure_parent_dirs_exist(Filename) ->
- case filelib:ensure_dir(Filename) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_parent_dirs, Filename, Reason}})
- end.
-
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -619,6 +545,12 @@ with_local_io(Fun) ->
group_leader(GL, self())
end.
+%% Log an info message on the local node using the standard logger.
+%% Use this if rabbit isn't running and the call didn't originate on
+%% the local node (e.g. rabbitmqctl calls).
+local_info_msg(Format, Args) ->
+ with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
+
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
case Do(App) of
@@ -743,73 +675,29 @@ version_compare(A, B) ->
dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
-recursive_delete(Files) ->
- lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
- (_Path, {error, _Err} = Error) -> Error
- end, ok, Files).
-
-recursive_delete1(Path) ->
- case filelib:is_dir(Path) of
- false -> case file:delete(Path) of
- ok -> ok;
- {error, enoent} -> ok; %% Path doesn't exist anyway
- {error, Err} -> {error, {Path, Err}}
- end;
- true -> case file:list_dir(Path) of
- {ok, FileNames} ->
- case lists:foldl(
- fun (FileName, ok) ->
- recursive_delete1(
- filename:join(Path, FileName));
- (_FileName, Error) ->
- Error
- end, ok, FileNames) of
- ok ->
- case file:del_dir(Path) of
- ok -> ok;
- {error, Err} -> {error, {Path, Err}}
- end;
- {error, _Err} = Error ->
- Error
- end;
- {error, Err} ->
- {error, {Path, Err}}
- end
- end.
-
-recursive_copy(Src, Dest) ->
- case filelib:is_dir(Src) of
- false -> case file:copy(Src, Dest) of
- {ok, _Bytes} -> ok;
- {error, enoent} -> ok; %% Path doesn't exist anyway
- {error, Err} -> {error, {Src, Dest, Err}}
- end;
- true -> case file:list_dir(Src) of
- {ok, FileNames} ->
- case file:make_dir(Dest) of
- ok ->
- lists:foldl(
- fun (FileName, ok) ->
- recursive_copy(
- filename:join(Src, FileName),
- filename:join(Dest, FileName));
- (_FileName, Error) ->
- Error
- end, ok, FileNames);
- {error, Err} ->
- {error, {Src, Dest, Err}}
- end;
- {error, Err} ->
- {error, {Src, Dest, Err}}
- end
- end.
-
dict_cons(Key, Value, Dict) ->
dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
+gb_trees_cons(Key, Value, Tree) ->
+ case gb_trees:lookup(Key, Tree) of
+ {value, Values} -> gb_trees:update(Key, [Value | Values], Tree);
+ none -> gb_trees:insert(Key, [Value], Tree)
+ end.
+
+gb_trees_fold(Fun, Acc, Tree) ->
+ gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))).
+
+gb_trees_fold1(_Fun, Acc, none) ->
+ Acc;
+gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
+ gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)).
+
+gb_trees_foreach(Fun, Tree) ->
+ gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
+
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
@@ -893,15 +781,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
{error, Reason}
end.
-%% TODO: When we stop supporting Erlang prior to R14, this should be
-%% replaced with file:open [write, exclusive]
-lock_file(Path) ->
- case filelib:is_file(Path) of
- true -> {error, eexist};
- false -> {ok, Lock} = file:open(Path, [write]),
- ok = file:close(Lock)
- end.
-
const_ok() -> ok.
const(X) -> fun () -> X end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 1ea909714e..c8c18843a8 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -70,6 +70,8 @@
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(table_names/0 :: () -> [atom()]).
+
-endif.
%%----------------------------------------------------------------------------
@@ -119,6 +121,10 @@ force_cluster(ClusterNodes) ->
%% node. If Force is false, only connections to online nodes are
%% allowed.
cluster(ClusterNodes, Force) ->
+ rabbit_misc:local_info_msg("Clustering with ~p~s~n",
+ [ClusterNodes, if Force -> " forcefully";
+ true -> ""
+ end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
@@ -432,7 +438,7 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_cluster_nodes_config,
@@ -441,7 +447,7 @@ create_cluster_nodes_config(ClusterNodes) ->
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case rabbit_misc:read_term_file(FileName) of
+ case rabbit_file:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
{ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
@@ -469,12 +475,12 @@ record_running_nodes() ->
Nodes = running_clustered_nodes() -- [node()],
%% Don't check the result: we're shutting down anyway and this is
%% a best-effort-basis.
- rabbit_misc:write_term_file(FileName, [Nodes]),
+ rabbit_file:write_term_file(FileName, [Nodes]),
ok.
read_previously_running_nodes() ->
FileName = running_nodes_filename(),
- case rabbit_misc:read_term_file(FileName) of
+ case rabbit_file:read_term_file(FileName) of
{ok, [Nodes]} -> Nodes;
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
@@ -636,7 +642,7 @@ move_db() ->
copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
- rabbit_misc:recursive_copy(dir(), Destination).
+ rabbit_file:recursive_copy(dir(), Destination).
create_tables() -> create_tables(disc).
@@ -716,6 +722,9 @@ wait_for_tables(TableNames) ->
end.
reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully";
+ true -> ""
+ end]),
ensure_mnesia_not_running(),
case not Force andalso is_clustered() andalso
is_only_disc_node(node(), false)
@@ -743,7 +752,7 @@ reset(Force) ->
end,
ok = delete_cluster_nodes_config(),
%% remove persisted messages and any other garbage we find
- ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
@@ -776,19 +785,13 @@ wait_for(Condition) ->
on_node_up(Node) ->
case is_only_disc_node(Node, true) of
- true -> rabbit_misc:with_local_io(
- fun () -> rabbit_log:info("cluster contains disc "
- "nodes again~n")
- end);
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
end.
on_node_down(Node) ->
case is_only_disc_node(Node, true) of
- true -> rabbit_misc:with_local_io(
- fun () -> rabbit_log:info("only running disc node "
- "went down~n")
- end);
+ true -> rabbit_log:info("only running disc node went down~n");
false -> ok
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 17d5f64b76..fc3cbebd4e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -146,6 +146,8 @@
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
+-spec(close_all_indicated/1 ::
+ (client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
@@ -587,7 +589,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
AttemptFileSummaryRecovery =
case ClientRefs of
- undefined -> ok = rabbit_misc:recursive_delete([Dir]),
+ undefined -> ok = rabbit_file:recursive_delete([Dir]),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
false;
_ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
@@ -1338,11 +1340,11 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
end.
store_recovery_terms(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
read_recovery_terms(Dir) ->
Path = filename:join(Dir, ?CLEAN_FILENAME),
- case rabbit_misc:read_term_file(Path) of
+ case rabbit_file:read_term_file(Path) of
{ok, Terms} -> case file:delete(Path) of
ok -> {true, Terms};
{error, Error} -> {false, Error}
@@ -1899,7 +1901,7 @@ transform_dir(BaseDir, Store, TransformFun) ->
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
- ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index b2abcba651..2c0912dfd6 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -78,6 +78,33 @@
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(check_tcp_listener_address/2 :: (atom(), listener_config())
-> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
+-spec(ssl_transform_fun/1 ::
+ (rabbit_types:infos())
+ -> fun ((rabbit_net:socket())
+ -> rabbit_types:ok_or_error(#ssl_socket{}))).
+
+-spec(boot/0 :: () -> 'ok').
+-spec(start_client/1 ::
+ (port() | #ssl_socket{ssl::{'sslsocket',_,_}}) ->
+ atom() | pid() | port() | {atom(),atom()}).
+-spec(start_ssl_client/2 ::
+ (_,port() | #ssl_socket{ssl::{'sslsocket',_,_}}) ->
+ atom() | pid() | port() | {atom(),atom()}).
+-spec(tcp_listener_started/3 ::
+ (_,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok').
+-spec(tcp_listener_stopped/3 ::
+ (_,
+ string() |
+ {byte(),byte(),byte(),byte()} |
+ {char(),char(),char(),char(),char(),char(),char(),char()},
+ _) ->
+ 'ok').
-endif.
@@ -293,6 +320,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
close_connection(Pid, Explanation) ->
+ rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index cb4f826d8c..8aa24ab53e 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -31,6 +31,7 @@
-ifdef(use_specs).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(rabbit_running_on/1 :: (node()) -> 'ok').
-spec(notify_cluster/0 :: () -> 'ok').
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 92829e4918..cd0c322b6d 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,6 +29,9 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
+%% Shut dialyzer up
+-spec(terminate/1 :: (string()) -> no_return()).
+-spec(terminate/2 :: (string(), [any()]) -> no_return()).
-endif.
@@ -67,7 +70,7 @@ start() ->
AppVersions},
%% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
- rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
+ rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% We exclude mochiweb due to its optional use of fdsrv.
XRefExclude = [mochiweb],
@@ -136,38 +139,10 @@ determine_version(App) ->
{App, Vsn}.
delete_recursively(Fn) ->
- case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
- true ->
- case file:list_dir(Fn) of
- {ok, Files} ->
- case lists:foldl(fun ( Fn1, ok) -> delete_recursively(
- Fn ++ "/" ++ Fn1);
- (_Fn1, Err) -> Err
- end, ok, Files) of
- ok -> case file:del_dir(Fn) of
- ok -> ok;
- {error, E} -> {error,
- {cannot_delete, Fn, E}}
- end;
- Err -> Err
- end;
- {error, E} ->
- {error, {cannot_list_files, Fn, E}}
- end;
- false ->
- case filelib:is_file(Fn) of
- true -> case file:delete(Fn) of
- ok -> ok;
- {error, E} -> {error, {cannot_delete, Fn, E}}
- end;
- false -> ok
- end
- end.
-
-is_symlink(Name) ->
- case file:read_link(Name) of
- {ok, _} -> true;
- _ -> false
+ case rabbit_file:recursive_delete([Fn]) of
+ ok -> ok;
+ {error, {Path, E}} -> {error, {cannot_delete, Path, E}};
+ Error -> Error
end.
unpack_ez_plugins(SrcDir, DestDir) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 636913b5c8..f1751e9515 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -229,7 +229,7 @@
init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
- false = filelib:is_file(Dir), %% is_file == is file or dir
+ false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
@@ -256,7 +256,7 @@ terminate(Terms, State) ->
delete_and_terminate(State) ->
{_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- ok = rabbit_misc:recursive_delete([Dir]),
+ ok = rabbit_file:recursive_delete([Dir]),
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
@@ -359,16 +359,16 @@ recover(DurableQueues) ->
{[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
- ok = rabbit_misc:recursive_delete([QueueDirPath]),
+ ok = rabbit_file:recursive_delete([QueueDirPath]),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
- case file:list_dir(Dir) of
+ case rabbit_file:list_dir(Dir) of
{ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
+ rabbit_file:is_dir(
filename:join(Dir, Entry)) ];
{error, enoent} -> []
end.
@@ -392,18 +392,18 @@ blank_state(QueueName) ->
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case file:delete(clean_file_name(Dir)) of
+ case rabbit_file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(clean_file_name(Dir)).
+ rabbit_file:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
CleanFileName = clean_file_name(Dir),
- ok = filelib:ensure_dir(CleanFileName),
- rabbit_misc:write_term_file(CleanFileName, Terms).
+ ok = rabbit_file:ensure_dir(CleanFileName),
+ rabbit_file:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -603,8 +603,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
- case filelib:is_file(Path) of
- true -> ok = file:delete(Path);
+ case rabbit_file:is_file(Path) of
+ true -> ok = rabbit_file:delete(Path);
false -> ok
end,
SegmentsN;
@@ -630,7 +630,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- ok = filelib:ensure_dir(Path),
+ ok = rabbit_file:ensure_dir(Path),
{ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -735,7 +735,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
SegName)), Set)
end, sets:from_list(segment_nums(Segments)),
- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+ rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))).
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
@@ -836,7 +836,7 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
- case filelib:is_file(Path) of
+ case rabbit_file:is_file(Path) of
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
@@ -1040,12 +1040,12 @@ foreach_queue_index(Funs) ->
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
[ok = transform_file(filename:join(Dir, Seg), SegmentFun)
- || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- case filelib:file_size(Path) of
+ case rabbit_file:file_size(Path) of
0 -> ok;
Size -> {ok, PathTmpHdl} =
file_handle_cache:open(PathTmp, ?WRITE_MODE,
@@ -1059,7 +1059,7 @@ transform_file(Path, Fun) ->
ok = drive_transform_fun(Fun, PathTmpHdl, Content),
ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path)
+ ok = rabbit_file:rename(PathTmp, Path)
end.
drive_transform_fun(Fun, Hdl, Contents) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7eec2a2ea0..b4871cefc1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -85,6 +85,15 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
+-spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
+-spec(process_channel_frame/5 ::
+ (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
+ tuple()) -> tuple()).
+
-endif.
%%--------------------------------------------------------------------------
@@ -493,20 +502,7 @@ handle_frame(Type, Channel, Payload,
AnalyzedFrame, self(),
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- channel_cleanup(ChPid),
- State;
- {method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking
- andalso
- Protocol:method_has_content(MethodName)) of
- true -> State#v1{connection_state = blocked};
- false -> State
- end;
- _ ->
- State
- end;
+ post_process_frame(AnalyzedFrame, ChPid, State);
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -518,6 +514,23 @@ handle_frame(Type, Channel, Payload,
end
end.
+post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
+ channel_cleanup(ChPid),
+ State;
+post_process_frame({method, MethodName, _}, _ChPid,
+ State = #v1{connection = #connection{
+ protocol = Protocol}}) ->
+ case Protocol:method_has_content(MethodName) of
+ true -> erlang:bump_reductions(2000),
+ case State#v1.connection_state of
+ blocking -> State#v1{connection_state = blocked};
+ _ -> State
+ end;
+ false -> State
+ end;
+post_process_frame(_Frame, _ChPid, State) ->
+ State.
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 0491244be0..cda3ccbe0f 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -24,6 +24,16 @@
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Name, {_M, _F, _A} = Fun) ->
supervisor:start_link({local, Name}, ?MODULE, [Fun]).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d453a8707e..e9c4479a63 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -107,9 +107,11 @@ check_delivery(true, _ , {false, []}) -> {unroutable, []};
check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.
+%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+%% expensive for the reasons explained in rabbit_misc:dirty_read/1.
lookup_qpids(QNames) ->
lists:foldl(fun (QName, QPids) ->
- case mnesia:dirty_read({rabbit_queue, QName}) of
+ case ets:lookup(rabbit_queue, QName) of
[#amqqueue{pid = QPid, slave_pids = SPids}] ->
[QPid | SPids ++ QPids];
[] ->
@@ -118,16 +120,8 @@ lookup_qpids(QNames) ->
end, [], QNames).
%% Normally we'd call mnesia:dirty_select/2 here, but that is quite
-%% expensive due to
-%%
-%% 1) general mnesia overheads (figuring out table types and
-%% locations, etc). We get away with bypassing these because we know
-%% that the table
-%% - is not the schema table
-%% - has a local ram copy
-%% - does not have any indices
-%%
-%% 2) 'fixing' of the table with ets:safe_fixtable/2, which is wholly
+%% expensive for the same reasons as above, and, additionally, due to
+%% mnesia 'fixing' the table with ets:safe_fixtable/2, which is wholly
%% unnecessary. According to the ets docs (and the code in erl_db.c),
%% 'select' is safe anyway ("Functions that internally traverse over a
%% table, like select and match, will give the same guarantee as
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 6f3c5c75bc..963294d924 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -26,12 +26,17 @@
%% with the result of closing the old handler when swapping handlers.
%% The first init/1 additionally allows for simple log rotation
%% when the suffix is not the empty string.
+%% The original init/1 also opened the file in 'write' mode, thus
+%% overwriting old logs. To remedy this, init/1 from
+%% lib/sasl/src/sasl_report_file_h.erl from R14B3 was copied as
+%% init_file/1 and changed so that it opens the file in 'append' mode.
%% Used only when swapping handlers and performing
%% log rotation
init({{File, Suffix}, []}) ->
- case rabbit_misc:append_file(File, Suffix) of
- ok -> ok;
+ case rabbit_file:append_file(File, Suffix) of
+ ok -> file:delete(File),
+ ok;
{error, Error} ->
rabbit_log:error("Failed to append contents of "
"sasl log file '~s' to '~s':~n~p~n",
@@ -47,11 +52,18 @@ init({{File, _}, error}) ->
init({File, []}) ->
init(File);
init({File, _Type} = FileInfo) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- sasl_report_file_h:init(FileInfo);
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(FileInfo);
init(File) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- sasl_report_file_h:init({File, sasl_error_logger_type()}).
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file({File, sasl_error_logger_type()}).
+
+init_file({File, Type}) ->
+ process_flag(trap_exit, true),
+ case file:open(File, [append]) of
+ {ok,Fd} -> {ok, {Fd, File, Type}};
+ Error -> Error
+ end.
handle_event(Event, State) ->
sasl_report_file_h:handle_event(Event, State).
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 508b127ec5..802ea5e2e7 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -27,6 +27,21 @@
-define(SERVER, ?MODULE).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_child/1 :: (atom()) -> 'ok').
+-spec(start_child/3 :: (atom(), atom(), [any()]) -> 'ok').
+-spec(start_restartable_child/1 :: (atom()) -> 'ok').
+-spec(start_restartable_child/2 :: (atom(), [any()]) -> 'ok').
+-spec(stop_child/1 :: (atom()) -> rabbit_types:ok_or_error(any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b90884d914..39f67ced2d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -757,13 +757,23 @@ test_topic_expect_match(X, List) ->
end, List).
test_app_management() ->
- %% starting, stopping, status
+ control_action(wait, [rabbit_mnesia:dir() ++ ".pid"]),
+ %% Starting, stopping and diagnostics. Note that we don't try
+ %% 'report' when the rabbit app is stopped and that we enable
+ %% tracing for the duration of this function.
+ ok = control_action(trace_on, []),
ok = control_action(stop_app, []),
ok = control_action(stop_app, []),
ok = control_action(status, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
ok = control_action(start_app, []),
ok = control_action(start_app, []),
ok = control_action(status, []),
+ ok = control_action(report, []),
+ ok = control_action(cluster_status, []),
+ ok = control_action(environment, []),
+ ok = control_action(trace_off, []),
passed.
test_log_management() ->
@@ -795,23 +805,11 @@ test_log_management() ->
ok = control_action(rotate_logs, []),
ok = test_logs_working(MainLog, SaslLog),
- %% log rotation on empty file
+ %% log rotation on empty files (the main log will have a ctl action logged)
ok = clean_logs([MainLog, SaslLog], Suffix),
ok = control_action(rotate_logs, []),
ok = control_action(rotate_logs, [Suffix]),
- [true, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
-
- %% original main log file is not writable
- ok = make_files_non_writable([MainLog]),
- {error, {cannot_rotate_main_logs, _}} = control_action(rotate_logs, []),
- ok = clean_logs([MainLog], Suffix),
- ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}]),
-
- %% original sasl log file is not writable
- ok = make_files_non_writable([SaslLog]),
- {error, {cannot_rotate_sasl_logs, _}} = control_action(rotate_logs, []),
- ok = clean_logs([SaslLog], Suffix),
- ok = add_log_handlers([{rabbit_sasl_report_file_h, SaslLog}]),
+ [false, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
%% logs with suffix are not writable
ok = control_action(rotate_logs, [Suffix]),
@@ -819,27 +817,28 @@ test_log_management() ->
ok = control_action(rotate_logs, [Suffix]),
ok = test_logs_working(MainLog, SaslLog),
- %% original log files are not writable
+ %% rotate when original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
- {error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ ok = control_action(rotate_logs, []),
- %% logging directed to tty (handlers were removed in last test)
+ %% logging directed to tty (first, remove handlers)
+ ok = delete_log_handlers([rabbit_sasl_report_file_h,
+ rabbit_error_logger_file_h]),
ok = clean_logs([MainLog, SaslLog], Suffix),
- ok = application:set_env(sasl, sasl_error_logger, tty),
- ok = application:set_env(kernel, error_logger, tty),
+ ok = application:set_env(rabbit, sasl_error_logger, tty),
+ ok = application:set_env(rabbit, error_logger, tty),
ok = control_action(rotate_logs, []),
[{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
%% rotate logs when logging is turned off
- ok = application:set_env(sasl, sasl_error_logger, false),
- ok = application:set_env(kernel, error_logger, silent),
+ ok = application:set_env(rabbit, sasl_error_logger, false),
+ ok = application:set_env(rabbit, error_logger, silent),
ok = control_action(rotate_logs, []),
[{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
%% cleanup
- ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
- ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}),
+ ok = application:set_env(rabbit, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
{rabbit_sasl_report_file_h, SaslLog}]),
passed.
@@ -850,8 +849,8 @@ test_log_management_during_startup() ->
%% start application with simple tty logging
ok = control_action(stop_app, []),
- ok = application:set_env(kernel, error_logger, tty),
- ok = application:set_env(sasl, sasl_error_logger, tty),
+ ok = application:set_env(rabbit, error_logger, tty),
+ ok = application:set_env(rabbit, sasl_error_logger, tty),
ok = add_log_handlers([{error_logger_tty_h, []},
{sasl_report_tty_h, []}]),
ok = control_action(start_app, []),
@@ -868,13 +867,12 @@ test_log_management_during_startup() ->
end,
%% fix sasl logging
- ok = application:set_env(sasl, sasl_error_logger,
- {file, SaslLog}),
+ ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}),
%% start application with logging to non-existing directory
TmpLog = "/tmp/rabbit-tests/test.log",
delete_file(TmpLog),
- ok = application:set_env(kernel, error_logger, {file, TmpLog}),
+ ok = application:set_env(rabbit, error_logger, {file, TmpLog}),
ok = delete_log_handlers([rabbit_error_logger_file_h]),
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
@@ -895,7 +893,7 @@ test_log_management_during_startup() ->
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log",
- ok = application:set_env(kernel, error_logger, {file, TmpTestDir}),
+ ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}),
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
@@ -910,7 +908,7 @@ test_log_management_during_startup() ->
%% start application with standard error_logger_file_h
%% handler not installed
- ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = application:set_env(rabbit, error_logger, {file, MainLog}),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -1146,6 +1144,7 @@ test_user_management() ->
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
control_action(add_user, ["foo", "bar"]),
+ ok = control_action(clear_password, ["foo"]),
ok = control_action(change_password, ["foo", "baz"]),
TestTags = fun (Tags) ->
@@ -1757,7 +1756,11 @@ test_file_handle_cache() ->
[filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]],
Content = <<"foo">>,
CopyFun = fun (Src, Dst) ->
- ok = rabbit_misc:write_file(Src, Content),
+ {ok, Hdl} = prim_file:open(Src, [binary, write]),
+ ok = prim_file:write(Hdl, Content),
+ ok = prim_file:sync(Hdl),
+ prim_file:close(Hdl),
+
{ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
{ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
Size = size(Content),
@@ -2314,9 +2317,42 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
- fun test_variable_queue_ack_limiting/1]],
+ fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_requeue/1]],
passed.
+test_variable_queue_requeue(VQ0) ->
+ Interval = 50,
+ Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+ Seq = lists:seq(1, Count),
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(false, Count, VQ1),
+ {VQ3, Acks} = lists:foldl(
+ fun (_N, {VQN, AckTags}) ->
+ {{#basic_message{}, false, AckTag, _}, VQM} =
+ rabbit_variable_queue:fetch(true, VQN),
+ {VQM, [AckTag | AckTags]}
+ end, {VQ2, []}, Seq),
+ Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
+ [Ack | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], lists:zip(Acks, Seq)),
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
+ fun(X) -> X end, VQ3),
+ VQ5 = lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], fun(X) -> X end, VQN),
+ VQM
+ end, VQ4, Subset),
+ VQ6 = lists:foldl(fun (AckTag, VQa) ->
+ {{#basic_message{}, true, AckTag, _}, VQb} =
+ rabbit_variable_queue:fetch(true, VQa),
+ VQb
+ end, VQ5, lists:reverse(Acks)),
+ {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
+ VQ7.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index f9632324be..58079ccf47 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -67,9 +67,11 @@ tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
%%----------------------------------------------------------------------------
start(VHost) ->
+ rabbit_log:info("Enabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end).
stop(VHost) ->
+ rabbit_log:info("Disabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> VHosts -- [VHost] end).
update_config(Fun) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e7a302f80d..717d94a802 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -115,7 +115,7 @@ ensure_backup_removed() ->
end.
remove_backup() ->
- ok = rabbit_misc:recursive_delete([backup_dir()]),
+ ok = rabbit_file:recursive_delete([backup_dir()]),
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
@@ -249,7 +249,7 @@ maybe_upgrade_local() ->
%% -------------------------------------------------------------------
apply_upgrades(Scope, Upgrades, Fun) ->
- ok = rabbit_misc:lock_file(lock_filename()),
+ ok = rabbit_file:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
Fun(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ea72de66e3..94c0913dfc 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -158,19 +158,18 @@
%% to search through qi segments looking for messages that are yet to
%% be acknowledged.
%%
-%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
-%% form). Acks for persistent messages are always stored in the tuple-
-%% form. Acks for transient messages are also stored in tuple-form if
-%% the message has been sent to disk as part of the memory reduction
-%% process. For transient messages that haven't already been written
-%% to disk, acks are stored in message-form.
+%% Pending acks are recorded in memory by storing the message itself.
+%% If the message has been sent to disk, we do not store the message
+%% content. During memory reduction, pending acks containing message
+%% content have that content removed and the corresponding messages
+%% are pushed out to disk.
%%
-%% During memory reduction, acks stored in message-form are converted
-%% to tuple-form, and the corresponding messages are pushed out to
-%% disk.
+%% Messages from pending acks are returned to q4, q3 and delta during
+%% requeue, based on the limits of seq_id contained in each. Requeued
+%% messages retain their original seq_id, maintaining order
+%% when requeued.
%%
-%% The order in which alphas are pushed to betas and message-form acks
+%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress). In
@@ -281,6 +280,8 @@
end_seq_id %% end_seq_id is exclusive
}).
+-record(merge_funs, {new, join, out, in, publish}).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -320,7 +321,7 @@
q3 :: bpqueue:bpqueue(),
q4 :: queue(),
next_seq_id :: seq_id(),
- pending_ack :: dict(),
+ pending_ack :: gb_tree(),
ram_ack_index :: gb_tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
@@ -410,16 +411,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, false,
init(#amqqueue { name = QueueName, durable = true }, true,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
- {PRef, TRef, Terms1} =
- case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
- [] -> {proplists:get_value(persistent_ref, Terms),
- proplists:get_value(transient_ref, Terms),
- Terms};
- _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
+ {PRef, Terms1} =
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:guid(), []};
+ PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, TRef,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
@@ -436,17 +435,14 @@ terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(true, State),
+ purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
rabbit_msg_store:client_ref(MSCStateP)
end,
- ok = rabbit_msg_store:client_terminate(MSCStateT),
- TRef = rabbit_msg_store:client_ref(MSCStateT),
- Terms = [{persistent_ref, PRef},
- {transient_ref, TRef},
- {persistent_count, PCount}],
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
+ Terms = [{persistent_ref, PRef}, {persistent_count, PCount}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
@@ -455,12 +451,12 @@ terminate(_Reason, State) ->
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
- %% as part of 'purge' and 'remove_pending_ack', other than
+ %% as part of 'purge' and 'purge_pending_ack', other than
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(false, State1),
+ purge_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of
undefined -> ok;
@@ -537,7 +533,11 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }))}.
drain_confirmed(State = #vqstate { confirmed = C }) ->
- {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+ case gb_sets:is_empty(C) of
+ true -> {[], State}; %% common case
+ false -> {gb_sets:to_list(C), State #vqstate {
+ confirmed = gb_sets:new() }}
+ end.
dropwhile(Pred, State) ->
case queue_out(State) of
@@ -563,34 +563,51 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+ack([], State) ->
+ {[], State};
ack(AckTags, State) ->
- {MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State),
- {MsgIds, a(State1)}.
-
-requeue(AckTags, MsgPropsFun, State) ->
- MsgPropsFun1 = fun (MsgProps) ->
- (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false }
- end,
- {MsgIds, State1} =
- ack(fun (_, _, _) -> ok end,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, MsgId, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- {MsgIds, a(reduce_memory_use(State1))}.
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2}) ->
+ {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
+ {lists:reverse(AllMsgIds),
+ a(State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) })}.
+
+requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ alpha_funs(),
+ MsgPropsFun, State),
+ {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
+ delta_limit(Delta),
+ beta_funs(),
+ MsgPropsFun, State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ MsgPropsFun, State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, a(reduce_memory_use(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }))}.
len(#vqstate { len = Len }) -> Len.
@@ -714,7 +731,7 @@ status(#vqstate {
{q3 , bpqueue:len(Q3)},
{q4 , queue:len(Q4)},
{len , Len},
- {pending_acks , dict:size(PA)},
+ {pending_acks , gb_trees:size(PA)},
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
@@ -787,6 +804,8 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
+trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
{Result, {MSCStateP1, MSCStateT}};
@@ -840,26 +859,30 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
- {Filtered1, Delivers1, Acks1}) ->
+ {Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> {[m(#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }) | Filtered1],
- Delivers1,
- Acks1}
+ false -> case gb_trees:is_defined(SeqId, PA) of
+ false -> {[m(#msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }) | Filtered1],
+ Delivers1,
+ Acks1};
+ true -> Acc
+ end
end
end, {[], [], []}, List),
{bpqueue:from_list([{true, Filtered}]),
@@ -922,7 +945,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q3 = bpqueue:new(),
q4 = queue:new(),
next_seq_id = NextSeqId,
- pending_ack = dict:new(),
+ pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty(),
index_state = IndexState1,
msg_store_clients = {PersistentClient, TransientClient},
@@ -1020,11 +1043,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
+ case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1172,29 +1194,34 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- msg_props = MsgProps } = MsgStatus,
+ msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
- PA1 = dict:store(SeqId, AckEntry, PA),
- State #vqstate { pending_ack = PA1,
+ State #vqstate { pending_ack = gb_trees:insert(SeqId, AckEntry, PA),
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
-remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
- State1 = State #vqstate { pending_ack = dict:new(),
+remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ {gb_trees:get(SeqId, PA),
+ State #vqstate { pending_ack = gb_trees:delete(SeqId, PA),
+ ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+
+purge_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
+ rabbit_misc:gb_trees_fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
+ State1 = State #vqstate { pending_ack = gb_trees:empty(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
true -> case orddict:find(false, MsgIdsByStore) of
@@ -1204,52 +1231,25 @@ remove_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
-ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
-ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }}) ->
- AckEntry = dict:fetch(SeqId, PA),
- {accumulate_ack(SeqId, AckEntry, Acc),
- Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA),
- ram_ack_index =
- gb_trees:delete_any(SeqId, RAI)})}
- end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
- {lists:reverse(AllMsgIds),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
-
accumulate_ack_init() -> {[], orddict:new(), []}.
-accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
- msg_on_disk = false,
- index_on_disk = false,
- msg_id = MsgId },
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+accumulate_ack(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
+ case MsgOnDisk of
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
@@ -1324,6 +1324,122 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
+%% Internal plumbing for requeue
+%%----------------------------------------------------------------------------
+
+alpha_funs() ->
+ #merge_funs {
+ new = fun queue:new/0,
+ join = fun queue:join/2,
+ out = fun queue:out/1,
+ in = fun queue:in/2,
+ publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+ (MsgStatus, #vqstate {
+ ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate {
+ ram_msg_count = RamMsgCount + 1 }}
+ end}.
+
+beta_funs() ->
+ #merge_funs {
+ new = fun bpqueue:new/0,
+ join = fun bpqueue:join/2,
+ out = fun (Q) ->
+ case bpqueue:out(Q) of
+ {{value, _IndexOnDisk, MsgStatus}, Q1} ->
+ {{value, MsgStatus}, Q1};
+ {empty, _Q1} = X ->
+ X
+ end
+ end,
+ in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
+ bpqueue:in(IOD, MsgStatus, Q)
+ end,
+ publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
+ State) ->
+ {#msg_status { index_on_disk = IndexOnDisk,
+ msg = Msg} = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } =
+ State1} =
+ maybe_write_to_disk(not MsgOnDisk, false,
+ MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
+ ram_msg_count = RamMsgCount +
+ one_if(Msg =/= undefined),
+ ram_index_count = RamIndexCount +
+ one_if(not IndexOnDisk) }}
+ end}.
+
+%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
+ MsgPropsFun, State) ->
+ queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
+
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
+ #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
+ MsgPropsFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case QOut(Q) of
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
+ Limit, Funs, MsgPropsFun, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
+ State),
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ QPublish(MsgStatus, State1),
+ queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, Funs, MsgPropsFun, State2)
+ end;
+queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
+ _MsgPropsFun, State) ->
+ {SeqIds, QJoin(Front, Q), MsgIds, State}.
+
+delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+ {Delta, MsgIds, State};
+delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId} = Delta,
+ MsgIds, MsgPropsFun, State) ->
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
+ {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk,
+ msg_on_disk = MsgOnDisk} = MsgStatus,
+ State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ {_MsgStatus, State2} =
+ maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
+ MsgStatus, State1),
+ {Delta0 #delta {
+ start_seq_id = lists:min([SeqId, StartSeqId]),
+ count = Count + 1,
+ end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
+ [MsgId | MsgIds0], State2}
+ end, {Delta, MsgIds, State}, SeqIds).
+
+%% Mostly opposite of record_pending_ack/2
+msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
+ remove_pending_ack(SeqId, State),
+ {MsgStatus #msg_status {
+ msg_props = (MsgPropsFun(MsgProps)) #message_properties {
+ needs_confirming = false } }, State1}.
+
+beta_limit(BPQ) ->
+ case bpqueue:out(BPQ) of
+ {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId;
+ {empty, _BPQ} -> undefined
+ end.
+
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1399,12 +1515,11 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
{Quota, State};
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status {
- msg_id = MsgId, %% ASSERTION
- is_persistent = false, %% ASSERTION
- msg_props = MsgProps } = dict:fetch(SeqId, PA),
- {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
+ MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ gb_trees:get(SeqId, PA),
+ {MsgStatus1, State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = gb_trees:update(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })
@@ -1510,6 +1625,7 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ pending_ack = PA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1520,7 +1636,7 @@ maybe_deltas_to_betas(State = #vqstate {
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, IndexState1),
+ betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case bpqueue:len(Q3a) of
0 ->
@@ -1592,9 +1708,9 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },
maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 400abc1083..f6bcbb7fd5 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -49,12 +49,12 @@
%% -------------------------------------------------------------------
-recorded() -> case rabbit_misc:read_term_file(schema_filename()) of
+recorded() -> case rabbit_file:read_term_file(schema_filename()) of
{ok, [V]} -> {ok, V};
{error, _} = Err -> Err
end.
-record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]).
+record(V) -> ok = rabbit_file:write_term_file(schema_filename(), [V]).
recorded_for_scope(Scope) ->
case recorded() of
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 08d6c99a9c..38bb76b03b 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -44,6 +44,7 @@
-define(INFO_KEYS, [name, tracing]).
add(VHostPath) ->
+ rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
@@ -69,7 +70,6 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
- rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
delete(VHostPath) ->
@@ -78,6 +78,7 @@ delete(VHostPath) ->
%% process, which in turn results in further mnesia actions and
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
+ rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
[{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
Q <- rabbit_amqqueue:list(VHostPath)],
[ok = rabbit_exchange:delete(Name, false) ||
@@ -86,7 +87,6 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
- rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
R.
internal_delete(VHostPath) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index ac3434d253..091b50e4c6 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -67,6 +67,9 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
+-spec(mainloop/2 :: (_,_) -> 'done').
+-spec(mainloop1/2 :: (_,_) -> any()).
+
-endif.
%%---------------------------------------------------------------------------
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index bf0eacd164..4c835598e0 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -22,6 +22,14 @@
-export([init/1]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+-spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Name, Callback) ->
supervisor:start_link({local,Name}, ?MODULE, Callback).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index cd64696904..ad2a0d02d0 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -25,6 +25,14 @@
-record(state, {sock, on_startup, on_shutdown, label}).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+-spec(start_link/8 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
+ atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+-endif.
+
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 58c2f30c18..5bff5c2701 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -22,6 +22,21 @@
-export([init/1]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/7 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
+ mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/8 ::
+ (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
+ mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 84c4121c9a..5feb146f64 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -21,6 +21,18 @@
-export([test_supervisor_delayed_restart/0,
init/1, start_child/0]).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(test_supervisor_delayed_restart/0 :: () -> 'passed').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
test_supervisor_delayed_restart() ->
passed = with_sup(simple_one_for_one_terminate,
fun (SupPid) ->
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 01597a236c..35ee1e5165 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -58,15 +58,15 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (float()) -> rabbit_types:ok_pid_or_error()).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
--spec(get_memory_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
-spec(set_vm_memory_high_watermark/1 :: (float()) -> 'ok').
+-spec(get_memory_limit/0 :: () -> non_neg_integer()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index e4f260cc50..456ff39f47 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -41,6 +41,7 @@
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/1 ::
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 28c1adc681..d37c3a0fd6 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -26,8 +26,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok_pid_or_error()).
-endif.