diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-11-11 18:00:36 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-11-11 18:00:36 +0000 |
| commit | cc75e5672ad23b63fb7c1f93e263b706a95d0cf6 (patch) | |
| tree | 7d7eea904ccb08231ec5b46d5286795c1b2bdb27 | |
| parent | 0b37027731d14c22e8aa0b9e45d7c3281b304509 (diff) | |
| download | rabbitmq-server-git-cc75e5672ad23b63fb7c1f93e263b706a95d0cf6.tar.gz | |
Avoid atom exhaustion in vhost message stores
| -rw-r--r-- | src/rabbit_msg_store.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 77 | ||||
| -rw-r--r-- | test/channel_operation_timeout_test_queue.erl | 2 |
3 files changed, 65 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b0beb975e8..1e929c7f7c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -474,15 +474,15 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Server, Dir, ClientRefs, StartupFunState) -> - gen_server2:start_link({local, Server}, ?MODULE, - [Server, Dir, ClientRefs, StartupFunState], +start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> + gen_server2:start_link(?MODULE, + [Name, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( @@ -522,7 +522,7 @@ write_flow(MsgId, Msg, %% rabbit_amqqueue_process process via the %% rabbit_variable_queue. We are accessing the %% rabbit_amqqueue_process process dictionary. - credit_flow:send(whereis(Server), CreditDiscBound), + credit_flow:send(Server, CreditDiscBound), client_write(MsgId, Msg, flow, CState). write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). @@ -548,7 +548,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). -set_maximum_since_use(Server, Age) -> +set_maximum_since_use(Server, Age) when is_pid(Server) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). %%---------------------------------------------------------------------------- @@ -710,16 +710,16 @@ clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, StartupFunState]) -> +init([Name, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - Dir = filename:join(BaseDir, atom_to_list(Server)), + Dir = filename:join(BaseDir, atom_to_list(Name)), {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module), - rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), + rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]), AttemptFileSummaryRecovery = case ClientRefs of @@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, - ClientRefs, Dir, Server), + ClientRefs, Dir), Clients = dict:from_list( [{CRef, {undefined, undefined, undefined}} || CRef <- ClientRefs1]), @@ -1551,16 +1551,16 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) -> {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> - rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) -> + rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]), {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" + rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n" "rebuilding indices from scratch~n", - [Server | ErrorArgs]), + [Dir | ErrorArgs]), {false, IndexModule:new(Dir), []} end, case read_recovery_terms(Dir) of diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index a5be431093..5784f199ca 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -13,6 +13,7 @@ start_link(Name, ClientRefs, StartupFunState) -> [Name, ClientRefs, StartupFunState]). init([Name, ClientRefs, StartupFunState]) -> + ets:new(Name, [named_table, public]), {ok, {{simple_one_for_one, 1, 1}, [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_vhost, [Name, ClientRefs, StartupFunState]}, @@ -23,37 +24,54 @@ add_vhost(Name, VHost) -> supervisor2:start_child(Name, [VHost]). start_vhost(Name, ClientRefs, StartupFunState, VHost) -> - VHostName = vhost_store_name(Name, VHost), - VHostDir = vhost_store_dir(VHost), - ok = rabbit_file:ensure_dir(VHostDir), - rabbit_msg_store:start_link(VHostName, VHostDir, - ClientRefs, StartupFunState). + case vhost_store_pid(Name, VHost) of + no_pid -> + VHostDir = vhost_store_dir(VHost), + ok = rabbit_file:ensure_dir(VHostDir), + case rabbit_msg_store:start_link(Name, VHostDir, ClientRefs, StartupFunState) of + {ok, Pid} -> + ets:insert(Name, {VHost, Pid}), + {ok, Pid}; + Other -> Other + end; + Pid when is_pid(Pid) -> + {error, {already_started, Pid}} + end. delete_vhost(Name, VHost) -> - VHostName = vhost_store_name(Name, VHost), - case whereis(VHostName) of - undefined -> ok; - Pid -> supervisor2:terminate_child(Name, Pid) + case vhost_store_pid(Name, VHost) of + no_pid -> ok; + Pid when is_pid(Pid) -> + supervisor2:terminate_child(Name, Pid), + cleanup_vhost_store(Name, VHost, Pid) end, ok. -client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> - VHostName = maybe_start_vhost(Server, VHost), - rabbit_msg_store:client_init(VHostName, Ref, MsgOnDiskFun, CloseFDsFun). +client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> + VHostPid = maybe_start_vhost(Name, VHost), + rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun). -maybe_start_vhost(Server, VHost) -> - VHostName = vhost_store_name(Server, VHost), - case whereis(VHostName) of - undefined -> add_vhost(Server, VHost); - _ -> ok - end, - VHostName. +maybe_start_vhost(Name, VHost) -> + case add_vhost(Name, VHost) of + {ok, Pid} -> Pid; + {error, {already_started, Pid}} -> Pid; + Error -> throw(Error) + end. -vhost_store_name(Name, VHost) -> - Base64EncodedName = rabbit_vhost:dir(VHost), - binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, "_", - Base64EncodedName/binary>>, - utf8). +vhost_store_pid(Name, VHost) -> + case ets:lookup(Name, VHost) of + [] -> no_pid; + [Pid] -> + case erlang:is_process_alive(Pid) of + true -> Pid; + false -> + cleanup_vhost_store(Name, VHost, Pid), + no_pid + end + end. + +cleanup_vhost_store(Name, VHost, Pid) -> + ets:delete_object(Name, {VHost, Pid}). vhost_store_dir(VHost) -> Dir = rabbit_mnesia:dir(), @@ -61,8 +79,9 @@ vhost_store_dir(VHost) -> binary_to_list(filename:join([Dir, Base64EncodedName])). successfully_recovered_state(Name, VHost) -> - VHostName = vhost_store_name(Name, VHost), - rabbit_msg_store:successfully_recovered_state(VHostName). - -% force_recovery -% transform_dir + case vhost_store_pid(Name, VHost) of + no_pid -> + throw({message_store_not_started, Name, VHost}); + Pid when is_pid(Pid) -> + rabbit_msg_store:successfully_recovered_state(Pid) + end. diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl index 07b1235672..5e256b1381 100644 --- a/test/channel_operation_timeout_test_queue.erl +++ b/test/channel_operation_timeout_test_queue.erl @@ -296,7 +296,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, - rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE), + rabbit_msg_store_vhost_sup:successfully_recovered_state(?PERSISTENT_MSG_STORE, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). |
