summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-22 13:32:47 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-22 13:32:47 +0100
commit7ab9f4b1caba2fa34ed505c870d08605b0a779f8 (patch)
treeec86c4b9458f28f9ae7ce122b758ca6114c46704
parenta1881d7e02c1b36ef449b2268fc3afab064d005c (diff)
parent3a7b6c7a3d32dc610c3b67f4bb26b83bea38cb38 (diff)
downloadrabbitmq-server-git-7ab9f4b1caba2fa34ed505c870d08605b0a779f8.tar.gz
Merge bug26150
-rw-r--r--docs/rabbitmq.config.example14
-rw-r--r--src/file_handle_cache.erl261
-rw-r--r--src/gm.erl3
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_channel_interceptor.erl25
-rw-r--r--src/rabbit_file.erl45
-rw-r--r--src/rabbit_mirror_queue_slave.erl10
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_policy.erl16
-rw-r--r--src/rabbit_runtime_parameters.erl21
-rw-r--r--src/rabbit_vhost.erl21
-rw-r--r--src/truncate.erl11
12 files changed, 255 insertions, 190 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index b0e13b1b8d..26de71b70d 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -257,9 +257,13 @@
%% {certfile, "/path/to/cert.pem"},
%% {keyfile, "/path/to/key.pem"}]}]},
+ %% One of 'basic', 'detailed' or 'none'. See
+ %% http://www.rabbitmq.com/management.html#fine-stats for more details.
+ %% {rates_mode, basic},
+
%% Configure how long aggregated data (such as message rates and queue
%% lengths) is retained. Please read the plugin's documentation in
- %% https://www.rabbitmq.com/management.html#configuration for more
+ %% http://www.rabbitmq.com/management.html#configuration for more
%% details.
%%
%% {sample_retention_policies,
@@ -268,14 +272,6 @@
%% {detailed, [{10, 5}]}]}
]},
- {rabbitmq_management_agent,
- [%% Misc/Advanced Options
- %%
- %% NB: Change these only if you understand what you are doing!
- %%
- %% {force_fine_statistics, true}
- ]},
-
%% ----------------------------------------------------------------------------
%% RabbitMQ Shovel Plugin
%%
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 71645a3c9a..3a7a692c5c 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -147,7 +147,7 @@
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
- set_limit/1, get_limit/0, info_keys/0,
+ set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2,
info/0, info/1]).
-export([ulimit/0]).
@@ -192,9 +192,11 @@
limit,
open_count,
open_pending,
- obtain_limit,
- obtain_count,
- obtain_pending,
+ obtain_limit, %%socket
+ obtain_count_socket,
+ obtain_count_file,
+ obtain_pending_socket,
+ obtain_pending_file,
clients,
timer_ref,
alarm_set,
@@ -205,7 +207,8 @@
{ pid,
callback,
opened,
- obtained,
+ obtained_socket,
+ obtained_file,
blocked,
pending_closes
}).
@@ -257,6 +260,8 @@
-spec(release/1 :: (non_neg_integer()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(with_handle/1 :: (fun(() -> A)) -> A).
+-spec(with_handle/2 :: (non_neg_integer(), fun(() -> A)) -> A).
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -493,15 +498,28 @@ obtain() -> obtain(1).
release() -> release(1).
transfer(Pid) -> transfer(Pid, 1).
-obtain(Count) when Count > 0 ->
+obtain(Count) -> obtain(Count, socket).
+release(Count) -> release(Count, socket).
+
+with_handle(Fun) ->
+ with_handle(1, Fun).
+
+with_handle(N, Fun) ->
+ ok = obtain(N, file),
+ try Fun()
+ after ok = release(N, file)
+ end.
+
+obtain(Count, Type) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
+ _ -> gen_server2:call(
+ ?SERVER, {obtain, Count, Type, self()}, infinity)
end.
-release(Count) when Count > 0 ->
- gen_server2:cast(?SERVER, {release, Count, self()}).
+release(Count, Type) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, Type, self()}).
transfer(Pid, Count) when Count > 0 ->
gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
@@ -810,12 +828,16 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(total_limit, #fhc_state{limit = Limit}) -> Limit;
-i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2;
-i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
-i(sockets_used, #fhc_state{obtain_count = Count}) -> Count;
+i(total_limit, #fhc_state{limit = Limit}) -> Limit;
+i(total_used, State) -> used(State);
+i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
+i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count;
i(Item, _) -> throw({bad_argument, Item}).
+used(#fhc_state{open_count = C1,
+ obtain_count_socket = C2,
+ obtain_count_file = C3}) -> C1 + C2 + C3.
+
%%----------------------------------------------------------------------------
%% gen_server2 callbacks
%%----------------------------------------------------------------------------
@@ -836,21 +858,23 @@ init([AlarmSet, AlarmClear]) ->
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
- {ok, #fhc_state { elders = Elders,
- limit = Limit,
- open_count = 0,
- open_pending = pending_new(),
- obtain_limit = ObtainLimit,
- obtain_count = 0,
- obtain_pending = pending_new(),
- clients = Clients,
- timer_ref = undefined,
- alarm_set = AlarmSet,
- alarm_clear = AlarmClear }}.
+ {ok, #fhc_state { elders = Elders,
+ limit = Limit,
+ open_count = 0,
+ open_pending = pending_new(),
+ obtain_limit = ObtainLimit,
+ obtain_count_file = 0,
+ obtain_pending_file = pending_new(),
+ obtain_count_socket = 0,
+ obtain_pending_socket = pending_new(),
+ clients = Clients,
+ timer_ref = undefined,
+ alarm_set = AlarmSet,
+ alarm_clear = AlarmClear }}.
prioritise_cast(Msg, _Len, _State) ->
case Msg of
- {release, _, _} -> 5;
+ {release, _, _, _} -> 5;
_ -> 0
end.
@@ -883,23 +907,24 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, N, Pid}, From, State = #fhc_state {
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Type, Pid}, From,
+ State = #fhc_state { clients = Clients }) ->
+ Count = obtain_state(Type, count, State),
+ Pending = obtain_state(Type, pending, State),
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
+ Item = #pending { kind = {obtain, Type}, pid = Pid,
+ requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
- State #fhc_state {
- obtain_pending = pending_in(Item, Pending) }
+ set_obtain_state(Type, pending,
+ pending_in(Item, Pending), State)
end,
{noreply,
- case obtain_limit_reached(State) of
+ case obtain_limit_reached(Type, State) of
true -> Enqueue();
- false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + N }) of
+ false -> case needs_reduce(
+ set_obtain_state(Type, count, Count + 1, State)) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -934,9 +959,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, N, Pid}, State) ->
- {noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -N, State)))};
+handle_cast({release, N, Type, Pid}, State) ->
+ State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)),
+ {noreply, adjust_alarm(State, State1)};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -951,32 +976,38 @@ handle_cast({close, Pid, EldestUnusedSince},
handle_cast({transfer, N, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +N,
- update_counts(obtain, FromPid, -N, State)))}.
+ update_counts({obtain, socket}, ToPid, +N,
+ update_counts({obtain, socket}, FromPid, -N,
+ State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #fhc_state { elders = Elders,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_pending = ObtainPending,
- clients = Clients }) ->
- [#cstate { opened = Opened, obtained = Obtained }] =
+ State = #fhc_state { elders = Elders,
+ open_count = OpenCount,
+ open_pending = OpenPending,
+ obtain_count_file = ObtainCountF,
+ obtain_count_socket = ObtainCountS,
+ obtain_pending_file = ObtainPendingF,
+ obtain_pending_socket = ObtainPendingS,
+ clients = Clients }) ->
+ [#cstate { opened = Opened,
+ obtained_file = ObtainedFile,
+ obtained_socket = ObtainedSocket}] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
true = ets:delete(Elders, Pid),
- FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, adjust_alarm(
- State,
- process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
+ Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
+ State1 = process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(Fun, OpenPending),
+ obtain_count_file = ObtainCountF - ObtainedFile,
+ obtain_count_socket = ObtainCountS - ObtainedSocket,
+ obtain_pending_file = filter_pending(Fun, ObtainPendingF),
+ obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }),
+ {noreply, adjust_alarm(State, State1)}.
terminate(_Reason, State = #fhc_state { clients = Clients,
elders = Elders }) ->
@@ -1039,10 +1070,23 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
-obtain_limit_reached(#fhc_state { obtain_limit = Limit,
- obtain_count = Count}) ->
+obtain_limit_reached(socket, State) -> obtain_limit_reached(State);
+obtain_limit_reached(file, State) -> needs_reduce(State).
+
+obtain_limit_reached(#fhc_state{obtain_limit = Limit,
+ obtain_count_socket = Count}) ->
Limit =/= infinity andalso Count >= Limit.
+obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N;
+obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N;
+obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N;
+obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N.
+
+set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N};
+set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N};
+set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N};
+set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}.
+
adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
alarm_clear = AlarmClear }, NewState) ->
case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
@@ -1055,25 +1099,24 @@ adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet,
process_pending(State = #fhc_state { limit = infinity }) ->
State;
process_pending(State) ->
- process_open(process_obtain(State)).
+ process_open(process_obtain(socket, process_obtain(file, State))).
process_open(State = #fhc_state { limit = Limit,
- open_pending = Pending,
- open_count = OpenCount,
- obtain_count = ObtainCount }) ->
- {Pending1, State1} =
- process_pending(Pending, Limit - (ObtainCount + OpenCount), State),
+ open_pending = Pending}) ->
+ {Pending1, State1} = process_pending(Pending, Limit - used(State), State),
State1 #fhc_state { open_pending = Pending1 }.
-process_obtain(State = #fhc_state { limit = Limit,
- obtain_pending = Pending,
- obtain_limit = ObtainLimit,
- obtain_count = ObtainCount,
- open_count = OpenCount }) ->
- Quota = lists:min([ObtainLimit - ObtainCount,
- Limit - (ObtainCount + OpenCount)]),
+process_obtain(Type, State = #fhc_state { limit = Limit,
+ obtain_limit = ObtainLimit }) ->
+ ObtainCount = obtain_state(Type, count, State),
+ Pending = obtain_state(Type, pending, State),
+ Quota = case Type of
+ file -> Limit - (used(State));
+ socket -> lists:min([ObtainLimit - ObtainCount,
+ Limit - (used(State))])
+ end,
{Pending1, State1} = process_pending(Pending, Quota, State),
- State1 #fhc_state { obtain_pending = Pending1 }.
+ set_obtain_state(Type, pending, Pending1, State1).
process_pending(Pending, Quota, State) when Quota =< 0 ->
{Pending, State};
@@ -1099,19 +1142,25 @@ run_pending_item(#pending { kind = Kind,
update_counts(Kind, Pid, Requested, State).
update_counts(Kind, Pid, Delta,
- State = #fhc_state { open_count = OpenCount,
- obtain_count = ObtainCount,
- clients = Clients }) ->
- {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients),
- State #fhc_state { open_count = OpenCount + OpenDelta,
- obtain_count = ObtainCount + ObtainDelta }.
+ State = #fhc_state { open_count = OpenCount,
+ obtain_count_file = ObtainCountF,
+ obtain_count_socket = ObtainCountS,
+ clients = Clients }) ->
+ {OpenDelta, ObtainDeltaF, ObtainDeltaS} =
+ update_counts1(Kind, Pid, Delta, Clients),
+ State #fhc_state { open_count = OpenCount + OpenDelta,
+ obtain_count_file = ObtainCountF + ObtainDeltaF,
+ obtain_count_socket = ObtainCountS + ObtainDeltaS }.
update_counts1(open, Pid, Delta, Clients) ->
ets:update_counter(Clients, Pid, {#cstate.opened, Delta}),
- {Delta, 0};
-update_counts1(obtain, Pid, Delta, Clients) ->
- ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}),
- {0, Delta}.
+ {Delta, 0, 0};
+update_counts1({obtain, file}, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}),
+ {0, Delta, 0};
+update_counts1({obtain, socket}, Pid, Delta, Clients) ->
+ ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}),
+ {0, 0, Delta}.
maybe_reduce(State) ->
case needs_reduce(State) of
@@ -1119,23 +1168,25 @@ maybe_reduce(State) ->
false -> State
end.
-needs_reduce(#fhc_state { limit = Limit,
- open_count = OpenCount,
- open_pending = OpenPending,
- obtain_count = ObtainCount,
- obtain_limit = ObtainLimit,
- obtain_pending = ObtainPending }) ->
+needs_reduce(State = #fhc_state { limit = Limit,
+ open_pending = OpenPending,
+ obtain_limit = ObtainLimit,
+ obtain_count_socket = ObtainCountS,
+ obtain_pending_file = ObtainPendingF,
+ obtain_pending_socket = ObtainPendingS }) ->
Limit =/= infinity
- andalso ((OpenCount + ObtainCount > Limit)
+ andalso ((used(State) > Limit)
orelse (not pending_is_empty(OpenPending))
- orelse (ObtainCount < ObtainLimit
- andalso not pending_is_empty(ObtainPending))).
-
-reduce(State = #fhc_state { open_pending = OpenPending,
- obtain_pending = ObtainPending,
- elders = Elders,
- clients = Clients,
- timer_ref = TRef }) ->
+ orelse (not pending_is_empty(ObtainPendingF))
+ orelse (ObtainCountS < ObtainLimit
+ andalso not pending_is_empty(ObtainPendingS))).
+
+reduce(State = #fhc_state { open_pending = OpenPending,
+ obtain_pending_file = ObtainPendingFile,
+ obtain_pending_socket = ObtainPendingSocket,
+ elders = Elders,
+ clients = Clients,
+ timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
@@ -1159,7 +1210,8 @@ reduce(State = #fhc_state { open_pending = OpenPending,
_ ->
notify_age0(Clients, CStates,
pending_count(OpenPending) +
- pending_count(ObtainPending))
+ pending_count(ObtainPendingFile) +
+ pending_count(ObtainPendingSocket))
end
end,
case TRef of
@@ -1196,12 +1248,13 @@ notify(Clients, Required, [#cstate{ pid = Pid,
notify(Clients, Required - Opened, Notifications).
track_client(Pid, Clients) ->
- case ets:insert_new(Clients, #cstate { pid = Pid,
- callback = undefined,
- opened = 0,
- obtained = 0,
- blocked = false,
- pending_closes = 0 }) of
+ case ets:insert_new(Clients, #cstate { pid = Pid,
+ callback = undefined,
+ opened = 0,
+ obtained_file = 0,
+ obtained_socket = 0,
+ blocked = false,
+ pending_closes = 0 }) of
true -> _MRef = erlang:monitor(process, Pid),
ok;
false -> ok
diff --git a/src/gm.erl b/src/gm.erl
index 0c0ac349b0..fb59b9cb0b 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1036,7 +1036,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName, TxnFun));
+ prune_or_create_group(Self, GroupName, TxnFun),
+ TxnFun);
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74f9cacf76..eb9ed4ed07 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -187,7 +187,7 @@ force_event_refresh(Ref) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
+init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
?store_proc_name({ConnName, Channel}),
@@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
- reader_pid = ReaderPid,
+ reader_pid = Foo,
writer_pid = WriterPid,
conn_pid = ConnPid,
conn_name = ConnName,
@@ -894,8 +894,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State),
+ test(State, ExchangeName),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -1119,7 +1118,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ State1 = test2(State, Msgs),
Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
@@ -1165,6 +1164,13 @@ handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
+test2(State, Msgs) ->
+ rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs).
+
+test(State, ExchangeName) ->
+ check_not_default_exchange(ExchangeName),
+ check_configure_permitted(ExchangeName, State).
+
%%----------------------------------------------------------------------------
%% We get the queue process to send the consume_ok on our behalf. This
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
index 81c17fbfbe..db9349acfb 100644
--- a/src/rabbit_channel_interceptor.erl
+++ b/src/rabbit_channel_interceptor.erl
@@ -33,7 +33,7 @@
-callback description() -> [proplists:property()].
-callback intercept(original_method(), rabbit_types:vhost()) ->
- rabbit_types:ok_or_error2(processed_method(), any()).
+ processed_method() | rabbit_misc:channel_or_connection_exit().
%% Whether the interceptor wishes to intercept the amqp method
-callback applies_to(intercept_method()) -> boolean().
@@ -62,20 +62,15 @@ intercept_method(M, VHost) ->
intercept_method(M, _VHost, []) ->
M;
intercept_method(M, VHost, [I]) ->
- case I:intercept(M, VHost) of
- {ok, M2} ->
- case validate_method(M, M2) of
- true ->
- M2;
- _ ->
- internal_error("Interceptor: ~p expected "
- "to return method: ~p but returned: ~p",
- [I, rabbit_misc:method_record_type(M),
- rabbit_misc:method_record_type(M2)])
- end;
- {error, Reason} ->
- internal_error("Interceptor: ~p failed with reason: ~p",
- [I, Reason])
+ M2 = I:intercept(M, VHost),
+ case validate_method(M, M2) of
+ true ->
+ M2;
+ _ ->
+ internal_error("Interceptor: ~p expected "
+ "to return method: ~p but returned: ~p",
+ [I, rabbit_misc:method_record_type(M),
+ rabbit_misc:method_record_type(M2)])
end;
intercept_method(M, _VHost, Is) ->
internal_error("More than one interceptor for method: ~p -- ~p",
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index d71818c869..81a617a821 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -24,6 +24,8 @@
-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
-export([lock_file/1]).
+-import(file_handle_cache, [with_handle/1, with_handle/2]).
+
-define(TMP_EXT, ".tmp").
%%----------------------------------------------------------------------------
@@ -81,7 +83,7 @@ file_size(File) ->
_ -> 0
end.
-ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end).
+ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end).
ensure_dir_internal("/") ->
ok;
@@ -101,23 +103,14 @@ wildcard(Pattern, Dir) ->
{error, _} -> []
end.
-list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
+list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end).
read_file_info(File) ->
- with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
-
-with_fhc_handle(Fun) ->
- with_fhc_handle(1, Fun).
-
-with_fhc_handle(N, Fun) ->
- ok = file_handle_cache:obtain(N),
- try Fun()
- after ok = file_handle_cache:release(N)
- end.
+ with_handle(fun () -> prim_file:read_file_info(File) end).
read_term_file(File) ->
try
- {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, Data} = with_handle(fun () -> prim_file:read_file(File) end),
{ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
TokenGroups = group_tokens(Tokens),
{ok, [begin
@@ -177,7 +170,7 @@ with_synced_copy(Path, Modes, Fun) ->
true ->
{error, append_not_supported, Path};
false ->
- with_fhc_handle(
+ with_handle(
fun () ->
Bak = Path ++ ?TMP_EXT,
case prim_file:open(Bak, Modes) of
@@ -206,16 +199,16 @@ append_file(File, Suffix) ->
append_file(_, _, "") ->
ok;
append_file(File, 0, Suffix) ->
- with_fhc_handle(fun () ->
- case prim_file:open([File, Suffix], [append]) of
- {ok, Fd} -> prim_file:close(Fd);
- Error -> Error
- end
- end);
+ with_handle(fun () ->
+ case prim_file:open([File, Suffix], [append]) of
+ {ok, Fd} -> prim_file:close(Fd);
+ Error -> Error
+ end
+ end);
append_file(File, _, Suffix) ->
- case with_fhc_handle(2, fun () ->
+ case with_handle(2, fun () ->
file:copy(File, {[File, Suffix], [append]})
- end) of
+ end) of
{ok, _BytesCopied} -> ok;
Error -> Error
end.
@@ -227,12 +220,12 @@ ensure_parent_dirs_exist(Filename) ->
throw({error, {cannot_create_parent_dirs, Filename, Reason}})
end.
-rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end).
+rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end).
-delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end).
+delete(File) -> with_handle(fun () -> prim_file:delete(File) end).
recursive_delete(Files) ->
- with_fhc_handle(
+ with_handle(
fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
(_Path, {error, _Err} = Error) -> Error
end, ok, Files)
@@ -307,7 +300,7 @@ recursive_copy(Src, Dest) ->
lock_file(Path) ->
case is_file(Path) of
true -> {error, eexist};
- false -> with_fhc_handle(
+ false -> with_handle(
fun () -> {ok, Lock} = prim_file:open(Path, [write]),
ok = prim_file:close(Lock)
end)
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ee889f8442..f18c319e5c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -102,7 +102,15 @@ handle_go(Q = #amqqueue{name = QName}) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QName, ?MODULE, [self()],
fun rabbit_misc:execute_mnesia_transaction/1),
- receive {joined, GM} -> ok end,
+ MRef = erlang:monitor(process, GM),
+ %% We ignore the DOWN message because we are also linked and
+ %% trapping exits, we just want to not get stuck and we will exit
+ %% later.
+ receive
+ {joined, GM} -> erlang:demonitor(MRef, [flush]),
+ ok;
+ {'DOWN', MRef, _, _, _} -> ok
+ end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f9e..18c07f86f1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -81,7 +81,7 @@
-ifdef(use_specs).
--export_type([resource_name/0, thunk/1]).
+-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 6e0abd6928..0a69fb325b 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -213,17 +213,23 @@ notify_clear(VHost, <<"policy">>, Name) ->
%% [1] We need to prevent this from becoming O(n^2) in a similar
%% manner to rabbit_binding:remove_for_{source,destination}. So see
%% the comment in rabbit_binding:lock_route_tables/0 for more rationale.
+%% [2] We could be here in a post-tx fun after the vhost has been
+%% deleted; in which case it's fine to do nothing.
update_policies(VHost) ->
Tabs = [rabbit_queue, rabbit_durable_queue,
rabbit_exchange, rabbit_durable_exchange],
{Xs, Qs} = rabbit_misc:execute_mnesia_transaction(
fun() ->
[mnesia:lock({table, T}, write) || T <- Tabs], %% [1]
- Policies = list(VHost),
- {[update_exchange(X, Policies) ||
- X <- rabbit_exchange:list(VHost)],
- [update_queue(Q, Policies) ||
- Q <- rabbit_amqqueue:list(VHost)]}
+ case catch list(VHost) of
+ {error, {no_such_vhost, _}} ->
+ ok; %% [2]
+ Policies ->
+ {[update_exchange(X, Policies) ||
+ X <- rabbit_exchange:list(VHost)],
+ [update_queue(Q, Policies) ||
+ Q <- rabbit_amqqueue:list(VHost)]}
+ end
end),
[catch notify(X) || X <- Xs],
[catch notify(Q) || Q <- Qs],
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 7307330b51..cf12591351 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -29,6 +29,7 @@
-ifdef(use_specs).
-type(ok_or_error_string() :: 'ok' | {'error_string', string()}).
+-type(ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok')).
-spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(),
rabbit_types:user() | 'none') -> ok_or_error_string()).
@@ -38,9 +39,9 @@
rabbit_types:user() | 'none') -> ok_or_error_string()).
-spec(set_global/2 :: (atom(), term()) -> 'ok').
-spec(clear/3 :: (rabbit_types:vhost(), binary(), binary())
- -> ok_or_error_string()).
+ -> ok_thunk_or_error_string()).
-spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary())
- -> ok_or_error_string()).
+ -> ok_thunk_or_error_string()).
-spec(list/0 :: () -> [rabbit_types:infos()]).
-spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]).
-spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]).
@@ -137,16 +138,22 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case lookup(VHost, Component, Name) of
- not_found -> {error_string, "Parameter does not exist"};
- _ -> mnesia_clear(VHost, Component, Name),
+ Notify = fun () ->
case lookup_component(Component) of
{ok, Mod} -> event_notify(
- parameter_cleared, VHost, Component,
- [{name, Name}]),
+ parameter_cleared, VHost, Component,
+ [{name, Name}]),
Mod:notify_clear(VHost, Component, Name);
_ -> ok
end
+ end,
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case mnesia:is_transaction() of
+ true -> Notify;
+ false -> Notify()
+ end
end.
mnesia_clear(VHost, Component, Name) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index b57627e401..cfa3add44a 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -88,12 +88,11 @@ delete(VHostPath) ->
#amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)],
[assert_benign(rabbit_exchange:delete(Name, false)) ||
#exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
- R = rabbit_misc:execute_mnesia_transaction(
- with(VHostPath, fun () ->
- ok = internal_delete(VHostPath)
- end)),
+ Funs = rabbit_misc:execute_mnesia_transaction(
+ with(VHostPath, fun () -> internal_delete(VHostPath) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
- R.
+ [ok = Fun() || Fun <- Funs],
+ ok.
assert_benign(ok) -> ok;
assert_benign({ok, _}) -> ok;
@@ -111,14 +110,14 @@ internal_delete(VHostPath) ->
[ok = rabbit_auth_backend_internal:clear_permissions(
proplists:get_value(user, Info), VHostPath)
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
- [ok = rabbit_runtime_parameters:clear(VHostPath,
- proplists:get_value(component, Info),
- proplists:get_value(name, Info))
+ Fs1 = [rabbit_runtime_parameters:clear(VHostPath,
+ proplists:get_value(component, Info),
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
- || Info <- rabbit_policy:list(VHostPath)],
+ Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
+ || Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
- ok.
+ Fs1 ++ Fs2.
exists(VHostPath) ->
mnesia:dirty_read({rabbit_vhost, VHostPath}) /= [].
diff --git a/src/truncate.erl b/src/truncate.erl
index 02dba2e36a..1d69de56ac 100644
--- a/src/truncate.erl
+++ b/src/truncate.erl
@@ -37,11 +37,12 @@ log_event({Type, GL, {Pid, ReportType, Report}}, Params)
log_event(Event, _Params) ->
Event.
-report([[Thing]], Params) -> report([Thing], Params);
-report(List, Params) -> [case Item of
- {K, V} -> {K, term(V, Params)};
- _ -> term(Item, Params)
- end || Item <- List].
+report([[Thing]], Params) -> report([Thing], Params);
+report(List, Params) when is_list(List) -> [case Item of
+ {K, V} -> {K, term(V, Params)};
+ _ -> term(Item, Params)
+ end || Item <- List];
+report(Other, Params) -> term(Other, Params).
term(Thing, {Content, Struct, ContentDec, StructDec}) ->
term(Thing, true, #params{content = Content,