diff options
| -rw-r--r-- | docs/rabbitmq-service.xml | 3 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
| -rw-r--r-- | src/worker_pool.erl | 10 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 40 |
7 files changed, 59 insertions, 34 deletions
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml index a4bd158087..3368960b80 100644 --- a/docs/rabbitmq-service.xml +++ b/docs/rabbitmq-service.xml @@ -66,8 +66,7 @@ Display usage information. <para> Install the service. The service will not be started. Subsequent invocations will update the service parameters if -relevant environment variables were modified or if the active -plugins were changed. +relevant environment variables were modified. </para> </listitem> </varlistentry> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 418653dfb0..e5a90410e1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -147,9 +147,13 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> - {ok, Pid, Key} = decode_fast_reply_to(Rest), - delegate:invoke_no_result( - Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}). + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + delegate:invoke_no_result( + Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}); + error -> + ok + end. %% We want to ensure people can't use this mechanism to send a message %% to an arbitrary process and kill it! @@ -174,8 +178,8 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> declare_fast_reply_to(_) -> not_found. -decode_fast_reply_to(Suffix) -> - case binary:split(Suffix, <<".">>) of +decode_fast_reply_to(Rest) -> + case string:tokens(binary_to_list(Rest), ".") of [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), {ok, Pid, Key}; _ -> error @@ -867,7 +871,8 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>, Key = base64:encode(rabbit_guid:gen_secure()), PidEnc = base64:encode(term_to_binary(self())), Suffix = <<PidEnc/binary, ".", Key/binary>>, - State1 = State#ch{reply_consumer = {CTag, Suffix, Key}}, + Consumer = {CTag, Suffix, binary_to_list(Key)}, + State1 = State#ch{reply_consumer = Consumer}, case NoWait of true -> {noreply, State1}; false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index be84273904..5f9a21e971 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -87,8 +87,10 @@ safe_handle_event(HandleEvent, Event, State) -> HandleEvent(Event, State) catch _:Error -> - io:format("Event crashed log handler:~n~P~n~P~n", - [Event, 30, Error, 30]), + io:format( + "Error in log handler~n====================~n" + "Event: ~P~nError: ~P~nStack trace: ~p~n~n", + [Event, 30, Error, 30, erlang:get_stacktrace()]), {ok, State} end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d2456918f2..c4148bbfc5 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -510,7 +510,7 @@ execute_mnesia_transaction(TxFun) -> end; true -> mnesia:sync_transaction(TxFun) end - end) of + end, single) of {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; {sync, {aborted, Reason}} -> throw({error, Reason}); {atomic, Result} -> Result; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 17fca7bbaf..880c30ebc4 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -801,14 +801,15 @@ find_auto_cluster_node([Node | Nodes]) -> find_auto_cluster_node(Nodes) end, case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _} = Reason -> Fail("~p~n", [Reason]); + {badrpc, _} = Reason -> Fail("~p~n", [Reason]); %% old delegate hash check - {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]); - {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of - {error, _} -> Fail("versions ~p~n", - [{OTP, Rabbit}]); - ok -> {ok, Node} - end + {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]); + {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]); + {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of + {error, _} -> Fail("versions ~p~n", + [{OTP, RMQ}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/worker_pool.erl b/src/worker_pool.erl index b1dba5a233..608cea9166 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,8 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). +-export([start_link/0, submit/1, submit/2, submit_async/1, ready/1, + idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(ready/1 :: (pid()) -> 'ok'). -spec(idle/1 :: (pid()) -> 'ok'). @@ -61,10 +63,14 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). submit(Fun) -> + submit(Fun, reuse). + +%% ProcessModel =:= single is for working around the mnesia_locker bug. +submit(Fun, ProcessModel) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), - worker_pool_worker:submit(Pid, Fun) + worker_pool_worker:submit(Pid, Fun, ProcessModel) end. submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index beb95bc631..819a6ae8ce 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -33,7 +33,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). --spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -53,8 +53,8 @@ start_link() -> next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). -submit(Pid, Fun) -> - gen_server2:call(Pid, {submit, Fun, self()}, infinity). +submit(Pid, Fun, ProcessModel) -> + gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity). submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). @@ -62,10 +62,22 @@ submit_async(Pid, Fun) -> set_maximum_since_use(Pid, Age) -> gen_server2:cast(Pid, {set_maximum_since_use, Age}). -run({M, F, A}) -> - apply(M, F, A); -run(Fun) -> - Fun(). +run({M, F, A}) -> apply(M, F, A); +run(Fun) -> Fun(). + +run(Fun, reuse) -> + run(Fun); +run(Fun, single) -> + Self = self(), + Ref = make_ref(), + spawn_link(fun () -> + put(worker_pool_worker, true), + Self ! {Ref, run(Fun)}, + unlink(Self) + end), + receive + {Ref, Res} -> Res + end. %%---------------------------------------------------------------------------- @@ -81,12 +93,12 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, undefined) -> - {noreply, {job, CPid, From, Fun}, hibernate}; +handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) -> + {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate}; -handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> +handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), - gen_server2:reply(From, run(Fun)), + gen_server2:reply(From, run(Fun, ProcessModel)), ok = worker_pool:idle(self()), {noreply, undefined, hibernate}; @@ -97,8 +109,8 @@ handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> - gen_server2:reply(From, run(Fun)), +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) -> + gen_server2:reply(From, run(Fun, ProcessModel)), ok = worker_pool:idle(self()), {noreply, undefined, hibernate}; |
