diff options
| author | Tim Watson <tim@rabbitmq.com> | 2013-11-18 12:13:14 +0000 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2013-11-18 12:13:14 +0000 |
| commit | 3f557dc14fc766f50e371a469aa092767acb98e3 (patch) | |
| tree | 5b8eea3179173134b006b83c65f5c66b397330fe | |
| parent | 063126a715524cb39ed8f1173cf8e96e2c91f005 (diff) | |
| parent | cbdd7758327e497d9a5285d3333da4625a73daf6 (diff) | |
| download | rabbitmq-server-git-3f557dc14fc766f50e371a469aa092767acb98e3.tar.gz | |
merge default into bug24926
| -rw-r--r-- | codegen.py | 8 | ||||
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 21 | ||||
| -rw-r--r-- | src/rabbit_binary_parser.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 46 | ||||
| -rw-r--r-- | src/worker_pool.erl | 18 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 49 |
7 files changed, 113 insertions, 56 deletions
diff --git a/codegen.py b/codegen.py index 842549cf03..91fa115423 100644 --- a/codegen.py +++ b/codegen.py @@ -187,7 +187,7 @@ def genErl(spec): elif type == 'table': return p+'Len:32/unsigned, '+p+'Tab:'+p+'Len/binary' - def genFieldPostprocessing(packed): + def genFieldPostprocessing(packed, hasContent): for f in packed: type = erlType(f.domain) if type == 'bit': @@ -199,6 +199,10 @@ def genErl(spec): elif type == 'table': print " F%d = rabbit_binary_parser:parse_table(F%dTab)," % \ (f.index, f.index) + # We skip the check on content-bearing methods for + # speed. This is a sanity check, not a security thing. + elif type == 'shortstr' and not hasContent: + print " rabbit_binary_parser:assert_utf8(F%d)," % (f.index) else: pass @@ -214,7 +218,7 @@ def genErl(spec): restSeparator = '' recordConstructorExpr = '#%s{%s}' % (m.erlangName(), fieldMapList(m.arguments)) print "decode_method_fields(%s, <<%s>>) ->" % (m.erlangName(), binaryPattern) - genFieldPostprocessing(packedFields) + genFieldPostprocessing(packedFields, m.hasContent) print " %s;" % (recordConstructorExpr,) def genDecodeProperties(c): diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d7c93924c2..6ec7ee0768 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1135,27 +1135,6 @@ <listitem><para>Number of consumers.</para></listitem> </varlistentry> <varlistentry> - <term>active_consumers</term> - <listitem> - <para> - Number of active consumers. An active consumer is - one which could immediately receive any messages - sent to the queue - i.e. it is not limited by its - prefetch count, TCP congestion, flow control, or - because it has issued channel.flow. At least one - of messages_ready and active_consumers must always - be zero. - </para> - <para> - Note that this value is an instantaneous snapshot - - when consumers are restricted by their prefetch - count they may only appear to be active for small - fractions of a second until more messages are sent - out. - </para> - </listitem> - </varlistentry> - <varlistentry> <term>memory</term> <listitem><para>Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.</para></listitem> diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index dc6d090ff0..088ad0e52e 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -20,6 +20,7 @@ -export([parse_table/1]). -export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([validate_utf8/1, assert_utf8/1]). %%---------------------------------------------------------------------------- @@ -30,6 +31,8 @@ (rabbit_types:content()) -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). +-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error'). +-spec(assert_utf8/1 :: (binary()) -> 'ok'). -endif. @@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) -> Content; clear_decoded_content(Content = #content{}) -> Content#content{properties = none}. + +assert_utf8(B) -> + case validate_utf8(B) of + ok -> ok; + error -> rabbit_misc:protocol_error( + frame_error, "Malformed UTF-8 in shortstr", []) + end. + +validate_utf8(Bin) -> + try + xmerl_ucs:from_utf8(Bin), + ok + catch exit:{ucs, _} -> + error + end. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 6f36f99df5..f34632867a 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) -> end. call(Node, {Mod, Fun, Args}) -> - rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +list_to_binary_utf8(L) -> + B = list_to_binary(L), + case rabbit_binary_parser:validate_utf8(B) of + ok -> B; + error -> throw({error, {not_utf_8, L}}) + end. rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b1a8649344..6f78d1d2b5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -293,32 +293,41 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. -%% If the Reason is shutdown, or {shutdown, _}, it is not the queue -%% being deleted: it's just the node going down. Even though we're a -%% slave, we have no idea whether or not we'll be the only copy coming -%% back up. Thus we must assume we will be, and preserve anything we -%% have on disk. terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. ok; -terminate({shutdown, dropped} = R, #state { backing_queue = BQ, - backing_queue_state = BQS }) -> +terminate({shutdown, dropped} = R, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> %% See rabbit_mirror_queue_master:terminate/2 + terminate_common(State), BQ:delete_and_terminate(R, BQS); -terminate(Reason, #state { q = Q, - gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = RateTRef }) -> - ok = gm:leave(GM), - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), - rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate(shutdown, State) -> + terminate_shutdown(shutdown, State); +terminate({shutdown, _} = R, State) -> + terminate_shutdown(R, State); +terminate(Reason, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + terminate_common(State), + BQ:delete_and_terminate(Reason, BQS); terminate([_SPid], _Reason) -> %% gm case ok. +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate_shutdown(Reason, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + terminate_common(State), + BQ:terminate(Reason, BQS). + +terminate_common(State) -> + ok = rabbit_memory_monitor:deregister(self()), + stop_rate_timer(stop_sync_timer(State)). + code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -663,8 +672,13 @@ confirm_sender_death(Pid) -> ok. forget_sender(_, running) -> false; +forget_sender(down_from_gm, down_from_gm) -> false; %% [1] forget_sender(Down1, Down2) when Down1 =/= Down2 -> true. +%% [1] If another slave goes through confirm_sender_death/1 before we +%% do we can get two GM sender_death messages in a row for the same +%% channel - don't treat that as anything special. + %% Record and process lifetime events from channels. Forget all about a channel %% only when down notifications are received from both the channel and from gm. maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ, diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 488db5ece5..e14c471c7f 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -63,7 +63,7 @@ start_link() -> submit(Fun) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), worker_pool_worker:submit(Pid, Fun) end. @@ -79,15 +79,17 @@ init([]) -> {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call(next_free, From, State = #state { available = Avail, - pending = Pending }) -> +handle_call({next_free, CPid}, From, State = #state { available = Avail, + pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> {noreply, - State #state { pending = queue:in({next_free, From}, Pending) }, + State#state{pending = queue:in({next_free, From, CPid}, Pending)}, hibernate}; {{value, WId}, Avail1} -> - {reply, get_worker_pid(WId), State #state { available = Avail1 }, + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate} end; @@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From}}, Pending1} -> - gen_server2:reply(From, get_worker_pid(WId)), + {{value, {next_free, From, CPid}}, Pending1} -> + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), State #state { pending = Pending1 }; {{value, {run_async, Fun}}, Pending1} -> worker_pool_worker:submit_async(get_worker_pid(WId), Fun), diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index a976503f00..724235bf43 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, submit_async/2, run/1]). +-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -32,6 +32,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). @@ -44,13 +45,18 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-record(state, {id, next}). + %%---------------------------------------------------------------------------- start_link(WId) -> gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +next_job_from(Pid, CPid) -> + gen_server2:cast(Pid, {next_job_from, CPid}). + submit(Pid, Fun) -> - gen_server2:call(Pid, {submit, Fun}, infinity). + gen_server2:call(Pid, {submit, Fun, self()}, infinity). submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). @@ -70,32 +76,57 @@ init([WId]) -> [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), - {ok, WId, hibernate, + {ok, #state{id = WId}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun}, From, WId) -> +handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> + {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; + +handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, + id = WId}) -> + erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State#state{next = undefined}, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({submit_async, Fun}, WId) -> +handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> + MRef = erlang:monitor(process, CPid), + {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + +handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, + id = WId}) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, State#state{next = undefined}, hibernate}; + +handle_cast({submit_async, Fun}, State = #state{id = WId}) -> run(Fun), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; -handle_cast({set_maximum_since_use, Age}, WId) -> +handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', MRef, process, CPid, _Reason}, + State = #state{id = WId, + next = {from, CPid, MRef}}) -> + ok = worker_pool:idle(WId), + {noreply, State#state{next = undefined}}; + +handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> + {noreply, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |
