summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-11-18 12:13:14 +0000
committerTim Watson <tim@rabbitmq.com>2013-11-18 12:13:14 +0000
commit3f557dc14fc766f50e371a469aa092767acb98e3 (patch)
tree5b8eea3179173134b006b83c65f5c66b397330fe
parent063126a715524cb39ed8f1173cf8e96e2c91f005 (diff)
parentcbdd7758327e497d9a5285d3333da4625a73daf6 (diff)
downloadrabbitmq-server-git-3f557dc14fc766f50e371a469aa092767acb98e3.tar.gz
merge default into bug24926
-rw-r--r--codegen.py8
-rw-r--r--docs/rabbitmqctl.1.xml21
-rw-r--r--src/rabbit_binary_parser.erl18
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl46
-rw-r--r--src/worker_pool.erl18
-rw-r--r--src/worker_pool_worker.erl49
7 files changed, 113 insertions, 56 deletions
diff --git a/codegen.py b/codegen.py
index 842549cf03..91fa115423 100644
--- a/codegen.py
+++ b/codegen.py
@@ -187,7 +187,7 @@ def genErl(spec):
elif type == 'table':
return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary'
- def genFieldPostprocessing(packed):
+ def genFieldPostprocessing(packed, hasContent):
for f in packed:
type = erlType(f.domain)
if type == 'bit':
@@ -199,6 +199,10 @@ def genErl(spec):
elif type == 'table':
print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \
(f.index, f.index)
+ # We skip the check on content-bearing methods for
+ # speed. This is a sanity check, not a security thing.
+ elif type == 'shortstr' and not hasContent:
+ print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index)
else:
pass
@@ -214,7 +218,7 @@ def genErl(spec):
restSeparator = ''
recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments))
print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern)
- genFieldPostprocessing(packedFields)
+ genFieldPostprocessing(packedFields, m.hasContent)
print " %s;" % (recordConstructorExpr,)
def genDecodeProperties(c):
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d7c93924c2..6ec7ee0768 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,27 +1135,6 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
- <term>active_consumers</term>
- <listitem>
- <para>
- Number of active consumers. An active consumer is
- one which could immediately receive any messages
- sent to the queue - i.e. it is not limited by its
- prefetch count, TCP congestion, flow control, or
- because it has issued channel.flow. At least one
- of messages_ready and active_consumers must always
- be zero.
- </para>
- <para>
- Note that this value is an instantaneous snapshot
- - when consumers are restricted by their prefetch
- count they may only appear to be active for small
- fractions of a second until more messages are sent
- out.
- </para>
- </listitem>
- </varlistentry>
- <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index dc6d090ff0..088ad0e52e 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99df5..f34632867a 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b1a8649344..6f78d1d2b5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -293,32 +293,41 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
-%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
-%% being deleted: it's just the node going down. Even though we're a
-%% slave, we have no idea whether or not we'll be the only copy coming
-%% back up. Thus we must assume we will be, and preserve anything we
-%% have on disk.
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
ok;
-terminate({shutdown, dropped} = R, #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+terminate({shutdown, dropped} = R, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
%% See rabbit_mirror_queue_master:terminate/2
+ terminate_common(State),
BQ:delete_and_terminate(R, BQS);
-terminate(Reason, #state { q = Q,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = RateTRef }) ->
- ok = gm:leave(GM),
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()),
- rabbit_amqqueue_process:terminate(Reason, QueueState);
+terminate(shutdown, State) ->
+ terminate_shutdown(shutdown, State);
+terminate({shutdown, _} = R, State) ->
+ terminate_shutdown(R, State);
+terminate(Reason, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ terminate_common(State),
+ BQ:delete_and_terminate(Reason, BQS);
terminate([_SPid], _Reason) ->
%% gm case
ok.
+%% If the Reason is shutdown, or {shutdown, _}, it is not the queue
+%% being deleted: it's just the node going down. Even though we're a
+%% slave, we have no idea whether or not we'll be the only copy coming
+%% back up. Thus we must assume we will be, and preserve anything we
+%% have on disk.
+terminate_shutdown(Reason, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ terminate_common(State),
+ BQ:terminate(Reason, BQS).
+
+terminate_common(State) ->
+ ok = rabbit_memory_monitor:deregister(self()),
+ stop_rate_timer(stop_sync_timer(State)).
+
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -663,8 +672,13 @@ confirm_sender_death(Pid) ->
ok.
forget_sender(_, running) -> false;
+forget_sender(down_from_gm, down_from_gm) -> false; %% [1]
forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
+%% [1] If another slave goes through confirm_sender_death/1 before we
+%% do we can get two GM sender_death messages in a row for the same
+%% channel - don't treat that as anything special.
+
%% Record and process lifetime events from channels. Forget all about a channel
%% only when down notifications are received from both the channel and from gm.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 488db5ece5..e14c471c7f 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -63,7 +63,7 @@ start_link() ->
submit(Fun) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun)
end.
@@ -79,15 +79,17 @@ init([]) ->
{ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(next_free, From, State = #state { available = Avail,
- pending = Pending }) ->
+handle_call({next_free, CPid}, From, State = #state { available = Avail,
+ pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
{noreply,
- State #state { pending = queue:in({next_free, From}, Pending) },
+ State#state{pending = queue:in({next_free, From, CPid}, Pending)},
hibernate};
{{value, WId}, Avail1} ->
- {reply, get_worker_pid(WId), State #state { available = Avail1 },
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 },
hibernate}
end;
@@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From}}, Pending1} ->
- gen_server2:reply(From, get_worker_pid(WId)),
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
State #state { pending = Pending1 };
{{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index a976503f00..724235bf43 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, submit/2, submit_async/2, run/1]).
+-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -32,6 +32,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
@@ -44,13 +45,18 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-record(state, {id, next}).
+
%%----------------------------------------------------------------------------
start_link(WId) ->
gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+next_job_from(Pid, CPid) ->
+ gen_server2:cast(Pid, {next_job_from, CPid}).
+
submit(Pid, Fun) ->
- gen_server2:call(Pid, {submit, Fun}, infinity).
+ gen_server2:call(Pid, {submit, Fun, self()}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
@@ -70,32 +76,57 @@ init([WId]) ->
[self()]),
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
- {ok, WId, hibernate,
+ {ok, #state{id = WId}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun}, From, WId) ->
+handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
+ {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+
+handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
+ id = WId}) ->
+ erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State#state{next = undefined}, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({submit_async, Fun}, WId) ->
+handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+ MRef = erlang:monitor(process, CPid),
+ {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+
+handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
+ id = WId}) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}, hibernate};
+
+handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
run(Fun),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
-handle_cast({set_maximum_since_use, Age}, WId) ->
+handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', MRef, process, CPid, _Reason},
+ State = #state{id = WId,
+ next = {from, CPid, MRef}}) ->
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}};
+
+handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
+ {noreply, State};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.