diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-05-22 13:32:47 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-05-22 13:32:47 +0100 |
| commit | 7ab9f4b1caba2fa34ed505c870d08605b0a779f8 (patch) | |
| tree | ec86c4b9458f28f9ae7ce122b758ca6114c46704 | |
| parent | a1881d7e02c1b36ef449b2268fc3afab064d005c (diff) | |
| parent | 3a7b6c7a3d32dc610c3b67f4bb26b83bea38cb38 (diff) | |
| download | rabbitmq-server-git-7ab9f4b1caba2fa34ed505c870d08605b0a779f8.tar.gz | |
Merge bug26150
| -rw-r--r-- | docs/rabbitmq.config.example | 14 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 261 | ||||
| -rw-r--r-- | src/gm.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_channel_interceptor.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_file.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 21 | ||||
| -rw-r--r-- | src/truncate.erl | 11 |
12 files changed, 255 insertions, 190 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index b0e13b1b8d..26de71b70d 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -257,9 +257,13 @@ %% {certfile, "/path/to/cert.pem"}, %% {keyfile, "/path/to/key.pem"}]}]}, + %% One of 'basic', 'detailed' or 'none'. See + %% http://www.rabbitmq.com/management.html#fine-stats for more details. + %% {rates_mode, basic}, + %% Configure how long aggregated data (such as message rates and queue %% lengths) is retained. Please read the plugin's documentation in - %% https://www.rabbitmq.com/management.html#configuration for more + %% http://www.rabbitmq.com/management.html#configuration for more %% details. %% %% {sample_retention_policies, @@ -268,14 +272,6 @@ %% {detailed, [{10, 5}]}]} ]}, - {rabbitmq_management_agent, - [%% Misc/Advanced Options - %% - %% NB: Change these only if you understand what you are doing! - %% - %% {force_fine_statistics, true} - ]}, - %% ---------------------------------------------------------------------------- %% RabbitMQ Shovel Plugin %% diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 71645a3c9a..3a7a692c5c 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -147,7 +147,7 @@ truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). -export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, - set_limit/1, get_limit/0, info_keys/0, + set_limit/1, get_limit/0, info_keys/0, with_handle/1, with_handle/2, info/0, info/1]). -export([ulimit/0]). @@ -192,9 +192,11 @@ limit, open_count, open_pending, - obtain_limit, - obtain_count, - obtain_pending, + obtain_limit, %%socket + obtain_count_socket, + obtain_count_file, + obtain_pending_socket, + obtain_pending_file, clients, timer_ref, alarm_set, @@ -205,7 +207,8 @@ { pid, callback, opened, - obtained, + obtained_socket, + obtained_file, blocked, pending_closes }). @@ -257,6 +260,8 @@ -spec(release/1 :: (non_neg_integer()) -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). -spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(with_handle/1 :: (fun(() -> A)) -> A). +-spec(with_handle/2 :: (non_neg_integer(), fun(() -> A)) -> A). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -493,15 +498,28 @@ obtain() -> obtain(1). release() -> release(1). transfer(Pid) -> transfer(Pid, 1). -obtain(Count) when Count > 0 -> +obtain(Count) -> obtain(Count, socket). +release(Count) -> release(Count, socket). + +with_handle(Fun) -> + with_handle(1, Fun). + +with_handle(N, Fun) -> + ok = obtain(N, file), + try Fun() + after ok = release(N, file) + end. + +obtain(Count, Type) when Count > 0 -> %% If the FHC isn't running, obtains succeed immediately. case whereis(?SERVER) of undefined -> ok; - _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity) + _ -> gen_server2:call( + ?SERVER, {obtain, Count, Type, self()}, infinity) end. -release(Count) when Count > 0 -> - gen_server2:cast(?SERVER, {release, Count, self()}). +release(Count, Type) when Count > 0 -> + gen_server2:cast(?SERVER, {release, Count, Type, self()}). transfer(Pid, Count) when Count > 0 -> gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). @@ -810,12 +828,16 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(total_limit, #fhc_state{limit = Limit}) -> Limit; -i(total_used, #fhc_state{open_count = C1, obtain_count = C2}) -> C1 + C2; -i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; -i(sockets_used, #fhc_state{obtain_count = Count}) -> Count; +i(total_limit, #fhc_state{limit = Limit}) -> Limit; +i(total_used, State) -> used(State); +i(sockets_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(sockets_used, #fhc_state{obtain_count_socket = Count}) -> Count; i(Item, _) -> throw({bad_argument, Item}). +used(#fhc_state{open_count = C1, + obtain_count_socket = C2, + obtain_count_file = C3}) -> C1 + C2 + C3. + %%---------------------------------------------------------------------------- %% gen_server2 callbacks %%---------------------------------------------------------------------------- @@ -836,21 +858,23 @@ init([AlarmSet, AlarmClear]) -> [Limit, ObtainLimit]), Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]), - {ok, #fhc_state { elders = Elders, - limit = Limit, - open_count = 0, - open_pending = pending_new(), - obtain_limit = ObtainLimit, - obtain_count = 0, - obtain_pending = pending_new(), - clients = Clients, - timer_ref = undefined, - alarm_set = AlarmSet, - alarm_clear = AlarmClear }}. + {ok, #fhc_state { elders = Elders, + limit = Limit, + open_count = 0, + open_pending = pending_new(), + obtain_limit = ObtainLimit, + obtain_count_file = 0, + obtain_pending_file = pending_new(), + obtain_count_socket = 0, + obtain_pending_socket = pending_new(), + clients = Clients, + timer_ref = undefined, + alarm_set = AlarmSet, + alarm_clear = AlarmClear }}. prioritise_cast(Msg, _Len, _State) -> case Msg of - {release, _, _} -> 5; + {release, _, _, _} -> 5; _ -> 0 end. @@ -883,23 +907,24 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State)} end; -handle_call({obtain, N, Pid}, From, State = #fhc_state { - obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) -> +handle_call({obtain, N, Type, Pid}, From, + State = #fhc_state { clients = Clients }) -> + Count = obtain_state(Type, count, State), + Pending = obtain_state(Type, pending, State), ok = track_client(Pid, Clients), - Item = #pending { kind = obtain, pid = Pid, requested = N, from = From }, + Item = #pending { kind = {obtain, Type}, pid = Pid, + requested = N, from = From }, Enqueue = fun () -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - State #fhc_state { - obtain_pending = pending_in(Item, Pending) } + set_obtain_state(Type, pending, + pending_in(Item, Pending), State) end, {noreply, - case obtain_limit_reached(State) of + case obtain_limit_reached(Type, State) of true -> Enqueue(); - false -> case needs_reduce(State #fhc_state { - obtain_count = Count + N }) of + false -> case needs_reduce( + set_obtain_state(Type, count, Count + 1, State)) of true -> reduce(Enqueue()); false -> adjust_alarm( State, run_pending_item(Item, State)) @@ -934,9 +959,9 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; -handle_cast({release, N, Pid}, State) -> - {noreply, adjust_alarm(State, process_pending( - update_counts(obtain, Pid, -N, State)))}; +handle_cast({release, N, Type, Pid}, State) -> + State1 = process_pending(update_counts({obtain, Type}, Pid, -N, State)), + {noreply, adjust_alarm(State, State1)}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> @@ -951,32 +976,38 @@ handle_cast({close, Pid, EldestUnusedSince}, handle_cast({transfer, N, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( - update_counts(obtain, ToPid, +N, - update_counts(obtain, FromPid, -N, State)))}. + update_counts({obtain, socket}, ToPid, +N, + update_counts({obtain, socket}, FromPid, -N, + State)))}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { elders = Elders, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_pending = ObtainPending, - clients = Clients }) -> - [#cstate { opened = Opened, obtained = Obtained }] = + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count_file = ObtainCountF, + obtain_count_socket = ObtainCountS, + obtain_pending_file = ObtainPendingF, + obtain_pending_socket = ObtainPendingS, + clients = Clients }) -> + [#cstate { opened = Opened, + obtained_file = ObtainedFile, + obtained_socket = ObtainedSocket}] = ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), true = ets:delete(Elders, Pid), - FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - {noreply, adjust_alarm( - State, - process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(FilterFun, OpenPending), - obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending) }))}. + Fun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, + State1 = process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(Fun, OpenPending), + obtain_count_file = ObtainCountF - ObtainedFile, + obtain_count_socket = ObtainCountS - ObtainedSocket, + obtain_pending_file = filter_pending(Fun, ObtainPendingF), + obtain_pending_socket = filter_pending(Fun, ObtainPendingS) }), + {noreply, adjust_alarm(State, State1)}. terminate(_Reason, State = #fhc_state { clients = Clients, elders = Elders }) -> @@ -1039,10 +1070,23 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of OLimit -> OLimit end. -obtain_limit_reached(#fhc_state { obtain_limit = Limit, - obtain_count = Count}) -> +obtain_limit_reached(socket, State) -> obtain_limit_reached(State); +obtain_limit_reached(file, State) -> needs_reduce(State). + +obtain_limit_reached(#fhc_state{obtain_limit = Limit, + obtain_count_socket = Count}) -> Limit =/= infinity andalso Count >= Limit. +obtain_state(file, count, #fhc_state{obtain_count_file = N}) -> N; +obtain_state(socket, count, #fhc_state{obtain_count_socket = N}) -> N; +obtain_state(file, pending, #fhc_state{obtain_pending_file = N}) -> N; +obtain_state(socket, pending, #fhc_state{obtain_pending_socket = N}) -> N. + +set_obtain_state(file, count, N, S) -> S#fhc_state{obtain_count_file = N}; +set_obtain_state(socket, count, N, S) -> S#fhc_state{obtain_count_socket = N}; +set_obtain_state(file, pending, N, S) -> S#fhc_state{obtain_pending_file = N}; +set_obtain_state(socket, pending, N, S) -> S#fhc_state{obtain_pending_socket = N}. + adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, alarm_clear = AlarmClear }, NewState) -> case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of @@ -1055,25 +1099,24 @@ adjust_alarm(OldState = #fhc_state { alarm_set = AlarmSet, process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_open(process_obtain(State)). + process_open(process_obtain(socket, process_obtain(file, State))). process_open(State = #fhc_state { limit = Limit, - open_pending = Pending, - open_count = OpenCount, - obtain_count = ObtainCount }) -> - {Pending1, State1} = - process_pending(Pending, Limit - (ObtainCount + OpenCount), State), + open_pending = Pending}) -> + {Pending1, State1} = process_pending(Pending, Limit - used(State), State), State1 #fhc_state { open_pending = Pending1 }. -process_obtain(State = #fhc_state { limit = Limit, - obtain_pending = Pending, - obtain_limit = ObtainLimit, - obtain_count = ObtainCount, - open_count = OpenCount }) -> - Quota = lists:min([ObtainLimit - ObtainCount, - Limit - (ObtainCount + OpenCount)]), +process_obtain(Type, State = #fhc_state { limit = Limit, + obtain_limit = ObtainLimit }) -> + ObtainCount = obtain_state(Type, count, State), + Pending = obtain_state(Type, pending, State), + Quota = case Type of + file -> Limit - (used(State)); + socket -> lists:min([ObtainLimit - ObtainCount, + Limit - (used(State))]) + end, {Pending1, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1 }. + set_obtain_state(Type, pending, Pending1, State1). process_pending(Pending, Quota, State) when Quota =< 0 -> {Pending, State}; @@ -1099,19 +1142,25 @@ run_pending_item(#pending { kind = Kind, update_counts(Kind, Pid, Requested, State). update_counts(Kind, Pid, Delta, - State = #fhc_state { open_count = OpenCount, - obtain_count = ObtainCount, - clients = Clients }) -> - {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), - State #fhc_state { open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta }. + State = #fhc_state { open_count = OpenCount, + obtain_count_file = ObtainCountF, + obtain_count_socket = ObtainCountS, + clients = Clients }) -> + {OpenDelta, ObtainDeltaF, ObtainDeltaS} = + update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count_file = ObtainCountF + ObtainDeltaF, + obtain_count_socket = ObtainCountS + ObtainDeltaS }. update_counts1(open, Pid, Delta, Clients) -> ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), - {Delta, 0}; -update_counts1(obtain, Pid, Delta, Clients) -> - ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), - {0, Delta}. + {Delta, 0, 0}; +update_counts1({obtain, file}, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained_file, Delta}), + {0, Delta, 0}; +update_counts1({obtain, socket}, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained_socket, Delta}), + {0, 0, Delta}. maybe_reduce(State) -> case needs_reduce(State) of @@ -1119,23 +1168,25 @@ maybe_reduce(State) -> false -> State end. -needs_reduce(#fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending }) -> +needs_reduce(State = #fhc_state { limit = Limit, + open_pending = OpenPending, + obtain_limit = ObtainLimit, + obtain_count_socket = ObtainCountS, + obtain_pending_file = ObtainPendingF, + obtain_pending_socket = ObtainPendingS }) -> Limit =/= infinity - andalso ((OpenCount + ObtainCount > Limit) + andalso ((used(State) > Limit) orelse (not pending_is_empty(OpenPending)) - orelse (ObtainCount < ObtainLimit - andalso not pending_is_empty(ObtainPending))). - -reduce(State = #fhc_state { open_pending = OpenPending, - obtain_pending = ObtainPending, - elders = Elders, - clients = Clients, - timer_ref = TRef }) -> + orelse (not pending_is_empty(ObtainPendingF)) + orelse (ObtainCountS < ObtainLimit + andalso not pending_is_empty(ObtainPendingS))). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending_file = ObtainPendingFile, + obtain_pending_socket = ObtainPendingSocket, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), {CStates, Sum, ClientCount} = ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> @@ -1159,7 +1210,8 @@ reduce(State = #fhc_state { open_pending = OpenPending, _ -> notify_age0(Clients, CStates, pending_count(OpenPending) + - pending_count(ObtainPending)) + pending_count(ObtainPendingFile) + + pending_count(ObtainPendingSocket)) end end, case TRef of @@ -1196,12 +1248,13 @@ notify(Clients, Required, [#cstate{ pid = Pid, notify(Clients, Required - Opened, Notifications). track_client(Pid, Clients) -> - case ets:insert_new(Clients, #cstate { pid = Pid, - callback = undefined, - opened = 0, - obtained = 0, - blocked = false, - pending_closes = 0 }) of + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained_file = 0, + obtained_socket = 0, + blocked = false, + pending_closes = 0 }) of true -> _MRef = erlang:monitor(process, Pid), ok; false -> ok diff --git a/src/gm.erl b/src/gm.erl index 0c0ac349b0..fb59b9cb0b 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -1036,7 +1036,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:filter(fun is_member_alive/1, Members) of [] -> join_group(Self, GroupName, - prune_or_create_group(Self, GroupName, TxnFun)); + prune_or_create_group(Self, GroupName, TxnFun), + TxnFun); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74f9cacf76..eb9ed4ed07 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -187,7 +187,7 @@ force_event_refresh(Ref) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, +init([Channel, Foo, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), @@ -195,7 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, State = #ch{state = starting, protocol = Protocol, channel = Channel, - reader_pid = ReaderPid, + reader_pid = Foo, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, @@ -894,8 +894,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_not_default_exchange(ExchangeName), - check_configure_permitted(ExchangeName, State), + test(State, ExchangeName), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -1119,7 +1118,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + State1 = test2(State, Msgs), Rev = fun (X) -> lists:reverse(lists:sort(X)) end, lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1); ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter) @@ -1165,6 +1164,13 @@ handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). +test2(State, Msgs) -> + rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs). + +test(State, ExchangeName) -> + check_not_default_exchange(ExchangeName), + check_configure_permitted(ExchangeName, State). + %%---------------------------------------------------------------------------- %% We get the queue process to send the consume_ok on our behalf. This diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl index 81c17fbfbe..db9349acfb 100644 --- a/src/rabbit_channel_interceptor.erl +++ b/src/rabbit_channel_interceptor.erl @@ -33,7 +33,7 @@ -callback description() -> [proplists:property()]. -callback intercept(original_method(), rabbit_types:vhost()) -> - rabbit_types:ok_or_error2(processed_method(), any()). + processed_method() | rabbit_misc:channel_or_connection_exit(). %% Whether the interceptor wishes to intercept the amqp method -callback applies_to(intercept_method()) -> boolean(). @@ -62,20 +62,15 @@ intercept_method(M, VHost) -> intercept_method(M, _VHost, []) -> M; intercept_method(M, VHost, [I]) -> - case I:intercept(M, VHost) of - {ok, M2} -> - case validate_method(M, M2) of - true -> - M2; - _ -> - internal_error("Interceptor: ~p expected " - "to return method: ~p but returned: ~p", - [I, rabbit_misc:method_record_type(M), - rabbit_misc:method_record_type(M2)]) - end; - {error, Reason} -> - internal_error("Interceptor: ~p failed with reason: ~p", - [I, Reason]) + M2 = I:intercept(M, VHost), + case validate_method(M, M2) of + true -> + M2; + _ -> + internal_error("Interceptor: ~p expected " + "to return method: ~p but returned: ~p", + [I, rabbit_misc:method_record_type(M), + rabbit_misc:method_record_type(M2)]) end; intercept_method(M, _VHost, Is) -> internal_error("More than one interceptor for method: ~p -- ~p", diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index d71818c869..81a617a821 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -24,6 +24,8 @@ -export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). -export([lock_file/1]). +-import(file_handle_cache, [with_handle/1, with_handle/2]). + -define(TMP_EXT, ".tmp"). %%---------------------------------------------------------------------------- @@ -81,7 +83,7 @@ file_size(File) -> _ -> 0 end. -ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end). +ensure_dir(File) -> with_handle(fun () -> ensure_dir_internal(File) end). ensure_dir_internal("/") -> ok; @@ -101,23 +103,14 @@ wildcard(Pattern, Dir) -> {error, _} -> [] end. -list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end). +list_dir(Dir) -> with_handle(fun () -> prim_file:list_dir(Dir) end). read_file_info(File) -> - with_fhc_handle(fun () -> prim_file:read_file_info(File) end). - -with_fhc_handle(Fun) -> - with_fhc_handle(1, Fun). - -with_fhc_handle(N, Fun) -> - ok = file_handle_cache:obtain(N), - try Fun() - after ok = file_handle_cache:release(N) - end. + with_handle(fun () -> prim_file:read_file_info(File) end). read_term_file(File) -> try - {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), + {ok, Data} = with_handle(fun () -> prim_file:read_file(File) end), {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), TokenGroups = group_tokens(Tokens), {ok, [begin @@ -177,7 +170,7 @@ with_synced_copy(Path, Modes, Fun) -> true -> {error, append_not_supported, Path}; false -> - with_fhc_handle( + with_handle( fun () -> Bak = Path ++ ?TMP_EXT, case prim_file:open(Bak, Modes) of @@ -206,16 +199,16 @@ append_file(File, Suffix) -> append_file(_, _, "") -> ok; append_file(File, 0, Suffix) -> - with_fhc_handle(fun () -> - case prim_file:open([File, Suffix], [append]) of - {ok, Fd} -> prim_file:close(Fd); - Error -> Error - end - end); + with_handle(fun () -> + case prim_file:open([File, Suffix], [append]) of + {ok, Fd} -> prim_file:close(Fd); + Error -> Error + end + end); append_file(File, _, Suffix) -> - case with_fhc_handle(2, fun () -> + case with_handle(2, fun () -> file:copy(File, {[File, Suffix], [append]}) - end) of + end) of {ok, _BytesCopied} -> ok; Error -> Error end. @@ -227,12 +220,12 @@ ensure_parent_dirs_exist(Filename) -> throw({error, {cannot_create_parent_dirs, Filename, Reason}}) end. -rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end). +rename(Old, New) -> with_handle(fun () -> prim_file:rename(Old, New) end). -delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end). +delete(File) -> with_handle(fun () -> prim_file:delete(File) end). recursive_delete(Files) -> - with_fhc_handle( + with_handle( fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path); (_Path, {error, _Err} = Error) -> Error end, ok, Files) @@ -307,7 +300,7 @@ recursive_copy(Src, Dest) -> lock_file(Path) -> case is_file(Path) of true -> {error, eexist}; - false -> with_fhc_handle( + false -> with_handle( fun () -> {ok, Lock} = prim_file:open(Path, [write]), ok = prim_file:close(Lock) end) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ee889f8442..f18c319e5c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -102,7 +102,15 @@ handle_go(Q = #amqqueue{name = QName}) -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QName, ?MODULE, [self()], fun rabbit_misc:execute_mnesia_transaction/1), - receive {joined, GM} -> ok end, + MRef = erlang:monitor(process, GM), + %% We ignore the DOWN message because we are also linked and + %% trapping exits, we just want to not get stuck and we will exit + %% later. + receive + {joined, GM} -> erlang:demonitor(MRef, [flush]), + ok; + {'DOWN', MRef, _, _, _} -> ok + end, Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 58e93a3f9e..18c07f86f1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -81,7 +81,7 @@ -ifdef(use_specs). --export_type([resource_name/0, thunk/1]). +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 6e0abd6928..0a69fb325b 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -213,17 +213,23 @@ notify_clear(VHost, <<"policy">>, Name) -> %% [1] We need to prevent this from becoming O(n^2) in a similar %% manner to rabbit_binding:remove_for_{source,destination}. So see %% the comment in rabbit_binding:lock_route_tables/0 for more rationale. +%% [2] We could be here in a post-tx fun after the vhost has been +%% deleted; in which case it's fine to do nothing. update_policies(VHost) -> Tabs = [rabbit_queue, rabbit_durable_queue, rabbit_exchange, rabbit_durable_exchange], {Xs, Qs} = rabbit_misc:execute_mnesia_transaction( fun() -> [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] - Policies = list(VHost), - {[update_exchange(X, Policies) || - X <- rabbit_exchange:list(VHost)], - [update_queue(Q, Policies) || - Q <- rabbit_amqqueue:list(VHost)]} + case catch list(VHost) of + {error, {no_such_vhost, _}} -> + ok; %% [2] + Policies -> + {[update_exchange(X, Policies) || + X <- rabbit_exchange:list(VHost)], + [update_queue(Q, Policies) || + Q <- rabbit_amqqueue:list(VHost)]} + end end), [catch notify(X) || X <- Xs], [catch notify(Q) || Q <- Qs], diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 7307330b51..cf12591351 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -29,6 +29,7 @@ -ifdef(use_specs). -type(ok_or_error_string() :: 'ok' | {'error_string', string()}). +-type(ok_thunk_or_error_string() :: ok_or_error_string() | fun(() -> 'ok')). -spec(parse_set/5 :: (rabbit_types:vhost(), binary(), binary(), string(), rabbit_types:user() | 'none') -> ok_or_error_string()). @@ -38,9 +39,9 @@ rabbit_types:user() | 'none') -> ok_or_error_string()). -spec(set_global/2 :: (atom(), term()) -> 'ok'). -spec(clear/3 :: (rabbit_types:vhost(), binary(), binary()) - -> ok_or_error_string()). + -> ok_thunk_or_error_string()). -spec(clear_any/3 :: (rabbit_types:vhost(), binary(), binary()) - -> ok_or_error_string()). + -> ok_thunk_or_error_string()). -spec(list/0 :: () -> [rabbit_types:infos()]). -spec(list/1 :: (rabbit_types:vhost() | '_') -> [rabbit_types:infos()]). -spec(list_component/1 :: (binary()) -> [rabbit_types:infos()]). @@ -137,16 +138,22 @@ clear(VHost, Component, Name) -> clear_any(VHost, Component, Name). clear_any(VHost, Component, Name) -> - case lookup(VHost, Component, Name) of - not_found -> {error_string, "Parameter does not exist"}; - _ -> mnesia_clear(VHost, Component, Name), + Notify = fun () -> case lookup_component(Component) of {ok, Mod} -> event_notify( - parameter_cleared, VHost, Component, - [{name, Name}]), + parameter_cleared, VHost, Component, + [{name, Name}]), Mod:notify_clear(VHost, Component, Name); _ -> ok end + end, + case lookup(VHost, Component, Name) of + not_found -> {error_string, "Parameter does not exist"}; + _ -> mnesia_clear(VHost, Component, Name), + case mnesia:is_transaction() of + true -> Notify; + false -> Notify() + end end. mnesia_clear(VHost, Component, Name) -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index b57627e401..cfa3add44a 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -88,12 +88,11 @@ delete(VHostPath) -> #amqqueue{name = Name} <- rabbit_amqqueue:list(VHostPath)], [assert_benign(rabbit_exchange:delete(Name, false)) || #exchange{name = Name} <- rabbit_exchange:list(VHostPath)], - R = rabbit_misc:execute_mnesia_transaction( - with(VHostPath, fun () -> - ok = internal_delete(VHostPath) - end)), + Funs = rabbit_misc:execute_mnesia_transaction( + with(VHostPath, fun () -> internal_delete(VHostPath) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), - R. + [ok = Fun() || Fun <- Funs], + ok. assert_benign(ok) -> ok; assert_benign({ok, _}) -> ok; @@ -111,14 +110,14 @@ internal_delete(VHostPath) -> [ok = rabbit_auth_backend_internal:clear_permissions( proplists:get_value(user, Info), VHostPath) || Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)], - [ok = rabbit_runtime_parameters:clear(VHostPath, - proplists:get_value(component, Info), - proplists:get_value(name, Info)) + Fs1 = [rabbit_runtime_parameters:clear(VHostPath, + proplists:get_value(component, Info), + proplists:get_value(name, Info)) || Info <- rabbit_runtime_parameters:list(VHostPath)], - [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) - || Info <- rabbit_policy:list(VHostPath)], + Fs2 = [rabbit_policy:delete(VHostPath, proplists:get_value(name, Info)) + || Info <- rabbit_policy:list(VHostPath)], ok = mnesia:delete({rabbit_vhost, VHostPath}), - ok. + Fs1 ++ Fs2. exists(VHostPath) -> mnesia:dirty_read({rabbit_vhost, VHostPath}) /= []. diff --git a/src/truncate.erl b/src/truncate.erl index 02dba2e36a..1d69de56ac 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -37,11 +37,12 @@ log_event({Type, GL, {Pid, ReportType, Report}}, Params) log_event(Event, _Params) -> Event. -report([[Thing]], Params) -> report([Thing], Params); -report(List, Params) -> [case Item of - {K, V} -> {K, term(V, Params)}; - _ -> term(Item, Params) - end || Item <- List]. +report([[Thing]], Params) -> report([Thing], Params); +report(List, Params) when is_list(List) -> [case Item of + {K, V} -> {K, term(V, Params)}; + _ -> term(Item, Params) + end || Item <- List]; +report(Other, Params) -> term(Other, Params). term(Thing, {Content, Struct, ContentDec, StructDec}) -> term(Thing, true, #params{content = Content, |
