summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmq-service.xml3
-rw-r--r--src/rabbit_channel.erl17
-rw-r--r--src/rabbit_error_logger_file_h.erl6
-rw-r--r--src/rabbit_misc.erl2
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/worker_pool.erl10
-rw-r--r--src/worker_pool_worker.erl40
7 files changed, 59 insertions, 34 deletions
diff --git a/docs/rabbitmq-service.xml b/docs/rabbitmq-service.xml
index a4bd158087..3368960b80 100644
--- a/docs/rabbitmq-service.xml
+++ b/docs/rabbitmq-service.xml
@@ -66,8 +66,7 @@ Display usage information.
<para>
Install the service. The service will not be started.
Subsequent invocations will update the service parameters if
-relevant environment variables were modified or if the active
-plugins were changed.
+relevant environment variables were modified.
</para>
</listitem>
</varlistentry>
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 418653dfb0..e5a90410e1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -147,9 +147,13 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
- {ok, Pid, Key} = decode_fast_reply_to(Rest),
- delegate:invoke_no_result(
- Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}).
+ case decode_fast_reply_to(Rest) of
+ {ok, Pid, Key} ->
+ delegate:invoke_no_result(
+ Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
+ error ->
+ ok
+ end.
%% We want to ensure people can't use this mechanism to send a message
%% to an arbitrary process and kill it!
@@ -174,8 +178,8 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
declare_fast_reply_to(_) ->
not_found.
-decode_fast_reply_to(Suffix) ->
- case binary:split(Suffix, <<".">>) of
+decode_fast_reply_to(Rest) ->
+ case string:tokens(binary_to_list(Rest), ".") of
[PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
{ok, Pid, Key};
_ -> error
@@ -867,7 +871,8 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
Key = base64:encode(rabbit_guid:gen_secure()),
PidEnc = base64:encode(term_to_binary(self())),
Suffix = <<PidEnc/binary, ".", Key/binary>>,
- State1 = State#ch{reply_consumer = {CTag, Suffix, Key}},
+ Consumer = {CTag, Suffix, binary_to_list(Key)},
+ State1 = State#ch{reply_consumer = Consumer},
case NoWait of
true -> {noreply, State1};
false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index be84273904..5f9a21e971 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -87,8 +87,10 @@ safe_handle_event(HandleEvent, Event, State) ->
HandleEvent(Event, State)
catch
_:Error ->
- io:format("Event crashed log handler:~n~P~n~P~n",
- [Event, 30, Error, 30]),
+ io:format(
+ "Error in log handler~n====================~n"
+ "Event: ~P~nError: ~P~nStack trace: ~p~n~n",
+ [Event, 30, Error, 30, erlang:get_stacktrace()]),
{ok, State}
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d2456918f2..c4148bbfc5 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -510,7 +510,7 @@ execute_mnesia_transaction(TxFun) ->
end;
true -> mnesia:sync_transaction(TxFun)
end
- end) of
+ end, single) of
{sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
{sync, {aborted, Reason}} -> throw({error, Reason});
{atomic, Result} -> Result;
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 17fca7bbaf..880c30ebc4 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -801,14 +801,15 @@ find_auto_cluster_node([Node | Nodes]) ->
find_auto_cluster_node(Nodes)
end,
case rpc:call(Node, rabbit_mnesia, node_info, []) of
- {badrpc, _} = Reason -> Fail("~p~n", [Reason]);
+ {badrpc, _} = Reason -> Fail("~p~n", [Reason]);
%% old delegate hash check
- {_OTP, Rabbit, _Hash, _} -> Fail("version ~s~n", [Rabbit]);
- {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
- {error, _} -> Fail("versions ~p~n",
- [{OTP, Rabbit}]);
- ok -> {ok, Node}
- end
+ {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]);
+ {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]);
+ {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of
+ {error, _} -> Fail("versions ~p~n",
+ [{OTP, RMQ}]);
+ ok -> {ok, Node}
+ end
end.
is_only_clustered_disc_node() ->
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index b1dba5a233..608cea9166 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,8 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
+-export([start_link/0, submit/1, submit/2, submit_async/1, ready/1,
+ idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -41,6 +42,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(ready/1 :: (pid()) -> 'ok').
-spec(idle/1 :: (pid()) -> 'ok').
@@ -61,10 +63,14 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
[{timeout, infinity}]).
submit(Fun) ->
+ submit(Fun, reuse).
+
+%% ProcessModel =:= single is for working around the mnesia_locker bug.
+submit(Fun, ProcessModel) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
_ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
- worker_pool_worker:submit(Pid, Fun)
+ worker_pool_worker:submit(Pid, Fun, ProcessModel)
end.
submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index beb95bc631..819a6ae8ce 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -33,7 +33,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
--spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -53,8 +53,8 @@ start_link() ->
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
-submit(Pid, Fun) ->
- gen_server2:call(Pid, {submit, Fun, self()}, infinity).
+submit(Pid, Fun, ProcessModel) ->
+ gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
@@ -62,10 +62,22 @@ submit_async(Pid, Fun) ->
set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).
-run({M, F, A}) ->
- apply(M, F, A);
-run(Fun) ->
- Fun().
+run({M, F, A}) -> apply(M, F, A);
+run(Fun) -> Fun().
+
+run(Fun, reuse) ->
+ run(Fun);
+run(Fun, single) ->
+ Self = self(),
+ Ref = make_ref(),
+ spawn_link(fun () ->
+ put(worker_pool_worker, true),
+ Self ! {Ref, run(Fun)},
+ unlink(Self)
+ end),
+ receive
+ {Ref, Res} -> Res
+ end.
%%----------------------------------------------------------------------------
@@ -81,12 +93,12 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, undefined) ->
- {noreply, {job, CPid, From, Fun}, hibernate};
+handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate};
-handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
+handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
- gen_server2:reply(From, run(Fun)),
+ gen_server2:reply(From, run(Fun, ProcessModel)),
ok = worker_pool:idle(self()),
{noreply, undefined, hibernate};
@@ -97,8 +109,8 @@ handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
{noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
- gen_server2:reply(From, run(Fun)),
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
+ gen_server2:reply(From, run(Fun, ProcessModel)),
ok = worker_pool:idle(self()),
{noreply, undefined, hibernate};