diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 05:33:08 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 05:33:08 +0000 |
| commit | 0363ebd8e391931635725a23c9372e6752bcf52b (patch) | |
| tree | 5a8c220b1044348996d89aa5cdcb6f5016f98768 /src | |
| parent | 6a07ff8c1d76b0bb4d4c807d0c65968656d73c10 (diff) | |
| parent | fa4968970f6160afdd64cceb07021a669e51f57c (diff) | |
| download | rabbitmq-server-git-0363ebd8e391931635725a23c9372e6752bcf52b.tar.gz | |
merge default into bug24297
(and resolve merge conflicts in limiter)
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 18 |
10 files changed, 183 insertions, 196 deletions
diff --git a/src/gm.erl b/src/gm.erl index df1c258d70..5a82950a41 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -382,7 +382,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -395,6 +395,7 @@ -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). +-define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). @@ -414,6 +415,7 @@ callback_args, confirms, broadcast_buffer, + broadcast_buffer_sz, broadcast_timer, txn_executor }). @@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) -> leave(Server) -> gen_server2:cast(Server, leave). -broadcast(Server, Msg) -> - gen_server2:cast(Server, {broadcast, Msg}). +broadcast(Server, Msg) -> broadcast(Server, Msg, 0). + +broadcast(Server, Msg, SizeHint) -> + gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). @@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) -> random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = -1, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_buffer_sz = 0, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, State); + internal_broadcast(Msg, From, 0, State); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> noreply(State); -handle_cast({broadcast, Msg}, +handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, @@ -650,8 +656,8 @@ handle_cast({broadcast, Msg}, handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); -handle_cast({broadcast, Msg}, State) -> - internal_broadcast(Msg, none, State); +handle_cast({broadcast, Msg, SizeHint}, State) -> + internal_broadcast(Msg, none, SizeHint, State); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - module = Module, - confirms = Confirms, - callback_args = Args, - broadcast_buffer = Buffer }) -> +internal_broadcast(Msg, From, SizeHint, + State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer, + broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount1, Msg} | Buffer], @@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self, none -> Confirms; _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1 }, - handle_callback_result({Result, case From of - none -> State1; - _ -> flush_broadcast_buffer(State1) - end}). + State1 = State #state { pub_count = PubCount1, + confirms = Confirms1, + broadcast_buffer = Buffer1, + broadcast_buffer_sz = BufferSize + SizeHint}, + handle_callback_result( + {Result, case From of + none -> maybe_flush_broadcast_buffer(State1); + _ -> flush_broadcast_buffer(State1) + end}). + +%% The Erlang distribution mechanism has an interesting quirk - it +%% will kill the VM cold with "Absurdly large distribution output data +%% buffer" if you attempt to send a message which serialises out to +%% more than 2^31 bytes in size. It's therefore a very good idea to +%% make sure that we don't exceed that size! +%% +%% Now, we could figure out the size of messages as they come in using +%% size(term_to_binary(Msg)) or similar. The trouble is, that requires +%% us to serialise the message only to throw the serialised form +%% away. Hard to believe that's a sensible thing to do. So instead we +%% accept a size hint from the application, via broadcast/3. This size +%% hint can be the size of anything in the message which we expect +%% could be large, and we just ignore the size of any small bits of +%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat +%% conservatively at 100MB - but the buffer is only to allow us to +%% buffer tiny messages anyway, so 100MB is plenty. + +maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> + case Size > ?MAX_BUFFER_SIZE of + true -> flush_broadcast_buffer(State); + false -> State + end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; @@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self, Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [] }. + State #state { members_state = MembersState1, + broadcast_buffer = [], + broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d54c2a8dba..19171659e3 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), lists:foldl( - fun(Module, {refused, _, _}) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> - {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> - Else + fun ({ModN, ModZ}, {refused, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ + %% module, and use the #user{} the latter gives us. + case try_login(ModN, Username, AuthProps) of + {ok, _} -> try_login(ModZ, Username, []); + Else -> Else end; - (_, {ok, User}) -> + (Mod, {refused, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + try_login(Mod, Username, AuthProps); + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... {ok, User} end, {refused, "No modules checked '~s'", [Username]}, Modules). +try_login(Module, Username, AuthProps) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> Else + end. + check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index cf94e61cfa..76e11dffb2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,7 +27,7 @@ -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). -export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). --export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). +-export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). @@ -113,10 +113,9 @@ -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(consumers/1 :: - (rabbit_types:amqqueue()) - -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). +-spec(consumers/1 :: (rabbit_types:amqqueue()) + -> [{pid(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) @@ -162,7 +161,6 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(notify_sent_queue_down/1 :: (pid()) -> 'ok'). -spec(resume/2 :: (pid(), pid()) -> 'ok'). --spec(flush_all/2 :: (qpids(), pid()) -> 'ok'). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit() | @@ -607,8 +605,6 @@ notify_sent_queue_down(QPid) -> resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). -flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). - internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), %% this 'guarded' delete prevents unnecessary writes to the mnesia diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 801c48d11f..b4355287f9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1083,10 +1083,6 @@ handle_cast({activate_limit, ChPid}, State) -> noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(), ChPid, State)); -handle_cast({flush, ChPid}, State) -> - ok = rabbit_channel:flushed(ChPid, self()), - noreply(State); - handle_cast({set_ram_duration_target, Duration}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536b2..3d70be4bc3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -22,7 +22,7 @@ message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1, parse_expiration/1]). --export([build_content/2, from_content/1]). +-export([build_content/2, from_content/1, msg_size/1]). %%---------------------------------------------------------------------------- @@ -77,6 +77,9 @@ (rabbit_framing:amqp_property_record()) -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). +-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) -> + non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- @@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> {error, {leftover_string, S}} end. +msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR); +msg_size(#basic_message{content = Content}) -> msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index da2ba2676c..e030096f5e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,8 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2, - flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -37,7 +36,7 @@ conn_name, limiter, tx, next_tag, unacked_message_q, user, virtual_host, most_recently_declared_queue, queue_names, queue_monitors, consumer_mapping, - blocking, queue_consumers, delivering_queues, + queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, capabilities, trace_state}). @@ -53,7 +52,6 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked, state]). -define(CREATION_EVENT_KEYS, @@ -99,7 +97,6 @@ -spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}]) -> 'ok'). --spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -149,9 +146,6 @@ send_credit_reply(Pid, Len) -> send_drained(Pid, CTagCredit) -> gen_server2:cast(Pid, {send_drained, CTagCredit}). -flushed(Pid, QPid) -> - gen_server2:cast(Pid, {flushed, QPid}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_channel, list_local, []). @@ -213,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, queue_names = dict:new(), queue_monitors = pmon:new(), consumer_mapping = dict:new(), - blocking = sets:new(), queue_consumers = dict:new(), delivering_queues = sets:new(), queue_collector_pid = CollectorPid, @@ -291,9 +284,6 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast({flushed, QPid}, State) -> - {noreply, queue_blocked(QPid, State), hibernate}; - handle_cast(ready_for_close, State = #ch{state = closing, writer_pid = WriterPid}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), @@ -368,8 +358,7 @@ handle_info(emit_stats, State) -> handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), - State2 = queue_blocked(QPid, State1), - State3 = handle_consuming_queue_down(QPid, State2), + State3 = handle_consuming_queue_down(QPid, State1), State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), #ch{queue_names = QNames, queue_monitors = QMons} = State4, @@ -480,9 +469,8 @@ check_resource_access(User, Resource, Perm) -> put(permission_cache, [V | CacheTail]) end. -clear_permission_cache() -> - erase(permission_cache), - ok. +clear_permission_cache() -> erase(permission_cache), + ok. check_configure_permitted(Resource, #ch{user = User}) -> check_resource_access(User, Resource, configure). @@ -522,6 +510,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +check_msg_size(Content) -> + Size = rabbit_basic:msg_size(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~B larger than max size ~B", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + qbin_to_resource(QueueNameBin, State) -> name_to_resource(queue, QueueNameBin, State). @@ -529,8 +525,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, Type, NameBin). expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> @@ -538,8 +533,7 @@ expand_queue_name_shortcut(QueueNameBin, _) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; @@ -593,20 +587,6 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. -queue_blocked(QPid, State = #ch{blocking = Blocking}) -> - case sets:is_element(QPid, Blocking) of - false -> State; - true -> maybe_send_flow_ok( - State#ch{blocking = sets:del_element(QPid, Blocking)}) - end. - -maybe_send_flow_ok(State = #ch{blocking = Blocking}) -> - case sets:size(Blocking) of - 0 -> ok = send(#'channel.flow_ok'{active = false}, State); - _ -> ok - end, - State. - record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -679,6 +659,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> + check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -873,8 +854,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, PrefetchCount, queue:len(UAMQ)), - {reply, #'basic.qos_ok'{}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; + case ((not rabbit_limiter:is_active(Limiter)) andalso + rabbit_limiter:is_active(Limiter1)) of + true -> rabbit_amqqueue:activate_limit_all( + consumer_queues(State#ch.consumer_mapping), self()); + false -> ok + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1165,36 +1151,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> return_ok(State#ch{confirm_enabled = true}, NoWait, #'confirm.select_ok'{}); -handle_method(#'channel.flow'{active = true}, - _, State = #ch{limiter = Limiter}) -> - Limiter1 = rabbit_limiter:unblock(Limiter), - {reply, #'channel.flow_ok'{active = true}, - maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; - -handle_method(#'channel.flow'{active = false}, - _, State = #ch{consumer_mapping = Consumers, - limiter = Limiter}) -> - case rabbit_limiter:is_blocked(Limiter) of - true -> {noreply, maybe_send_flow_ok(State)}; - false -> Limiter1 = rabbit_limiter:block(Limiter), - State1 = maybe_limit_queues(Limiter, Limiter1, - State#ch{limiter = Limiter1}), - %% The semantics of channel.flow{active=false} - %% require that no messages are delivered after the - %% channel.flow_ok has been sent. We accomplish that - %% by "flushing" all messages in flight from the - %% consumer queues to us. To do this we tell all the - %% queues to invoke rabbit_channel:flushed/2, which - %% will send us a {flushed, ...} message that appears - %% *after* all the {deliver, ...} messages. We keep - %% track of all the QPids thus asked, and once all of - %% them have responded (or died) we send the - %% channel.flow_ok. - QPids = consumer_queues(Consumers), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, maybe_send_flow_ok( - State1#ch{blocking = sets:from_list(QPids)})} - end; +handle_method(#'channel.flow'{active = true}, _, State) -> + {reply, #'channel.flow_ok'{active = true}, State}; + +handle_method(#'channel.flow'{active = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "active=false", []); handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, @@ -1446,15 +1407,6 @@ foreach_per_consumer(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -maybe_limit_queues(OldLimiter, NewLimiter, State) -> - case ((not rabbit_limiter:is_active(OldLimiter)) andalso - rabbit_limiter:is_active(NewLimiter)) of - true -> Queues = consumer_queues(State#ch.consumer_mapping), - rabbit_amqqueue:activate_limit_all(Queues, self()); - false -> ok - end, - State. - consumer_queues(Consumers) -> lists:usort([QPid || {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). @@ -1466,7 +1418,7 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> %% optimisation: avoid the potentially expensive 'foldl' in the %% common case. - case rabbit_limiter:is_prefetch_limited(Limiter) of + case rabbit_limiter:is_active(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1634,8 +1586,6 @@ i(state, #ch{state = running}) -> credit_flow:state(); i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); -i(client_flow_blocked, #ch{limiter = Limiter}) -> - rabbit_limiter:is_blocked(Limiter); i(Item, _) -> throw({bad_argument, Item}). @@ -1656,8 +1606,7 @@ update_measures(Type, Key, Inc, Measure) -> end, put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)). -emit_stats(State) -> - emit_stats(State, []). +emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> Coarse = infos(?STATISTICS_KEYS, State), diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index d59641b0d1..9421b52e83 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -79,6 +79,25 @@ init_file(File, PrevHandler) -> %% filter out "application: foo; exited: stopped; type: temporary" handle_event({info_report, _, {_, std_info, _}}, State) -> {ok, State}; +%% When a node restarts quickly it is possible the rest of the cluster +%% will not have had the chance to remove its queues from +%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes +%% on_node_down(node()). But before we get there we can receive lots +%% of messages intended for the old version of the node. The emulator +%% logs an event for every one of those messages; in extremis this can +%% bring the server to its knees just logging "Discarding..." +%% again and again. So just log the first one, then go silent. +handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, + State) -> + case get(discarding_message_seen) of + true -> {ok, State}; + undefined -> put(discarding_message_seen, true), + error_logger_file_h:handle_event(Event, State) + end; +%% Clear this state if we log anything else (but not a progress report). +handle_event(Event = {info_msg, _, _}, State) -> + erase(discarding_message_seen), + error_logger_file_h:handle_event(Event, State); handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 5dcd12b8e9..8d92ef9c46 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,8 +17,8 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count, channel.flow, and our consumer prefetch extension, -%% and AMQP 1.0's link (aka consumer) credit mechanism. +%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's +%% link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -69,11 +69,9 @@ %% %% 1. Channels tell the limiter about basic.qos prefetch counts - %% that's what the limit_prefetch/3, unlimit_prefetch/1, -%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are -%% about - and channel.flow blocking - that's what block/1, -%% unblock/1 and is_blocked/1 are for. They also tell the limiter -%% queue state (via the queue) about consumer credit changes and -%% messaeg acknowledgement - that's what credit/5 and +%% get_prefetch_limit/1 API functions are about. They also tell the +%% limiter queue state (via the queue) about consumer credit +%% changes and message acknowledgement - that's what credit/5 and %% ack_from_queue/3 are for. %% %% 2. Queues also tell the limiter queue state about the queue @@ -88,12 +86,11 @@ %% %% 5. Queues ask the limiter for permission (with can_send/3) whenever %% they want to deliver a message to a channel. The limiter checks -%% whether a) the channel isn't blocked by channel.flow, b) the -%% volume has not yet reached the prefetch limit, and c) whether -%% the consumer has enough credit. If so it increments the volume -%% and tells the queue to proceed. Otherwise it marks the queue as -%% requiring notification (see below) and tells the queue not to -%% proceed. +%% whether a) the volume has not yet reached the prefetch limit, +%% and b) whether the consumer has enough credit. If so it +%% increments the volume and tells the queue to proceed. Otherwise +%% it marks the queue as requiring notification (see below) and +%% tells the queue not to proceed. %% %% 6. A queue that has been told to proceed (by the return value of %% can_send/3) sends the message to the channel. Conversely, a @@ -128,8 +125,7 @@ -export([start_link/1]). %% channel API --export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, - is_prefetch_limited/1, is_blocked/1, is_active/1, +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1, get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, @@ -141,14 +137,13 @@ %%---------------------------------------------------------------------------- --record(lstate, {pid, prefetch_limited, blocked}). +-record(lstate, {pid, prefetch_limited}). -record(qstate, {pid, state, credits}). -ifdef(use_specs). -type(lstate() :: #lstate{pid :: pid(), - prefetch_limited :: boolean(), - blocked :: boolean()}). + prefetch_limited :: boolean()}). -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). @@ -161,10 +156,6 @@ -spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) -> lstate()). -spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). --spec(block/1 :: (lstate()) -> lstate()). --spec(unblock/1 :: (lstate()) -> lstate()). --spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). --spec(is_blocked/1 :: (lstate()) -> boolean()). -spec(is_active/1 :: (lstate()) -> boolean()). -spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). -spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). @@ -192,7 +183,6 @@ -record(lim, {prefetch_count = 0, ch_pid, - blocked = false, queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -210,7 +200,7 @@ start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. ok = gen_server:call(Pid, {new, self()}, infinity), - #lstate{pid = Pid, prefetch_limited = false, blocked = false}. + #lstate{pid = Pid, prefetch_limited = false}. limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> ok = gen_server:call( @@ -222,19 +212,7 @@ unlimit_prefetch(L) -> ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity), L#lstate{prefetch_limited = false}. -block(L) -> - ok = gen_server:call(L#lstate.pid, block, infinity), - L#lstate{blocked = true}. - -unblock(L) -> - ok = gen_server:call(L#lstate.pid, unblock, infinity), - L#lstate{blocked = false}. - -is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. - -is_blocked(#lstate{blocked = Blocked}) -> Blocked. - -is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). +is_active(#lstate{prefetch_limited = Limited}) -> Limited. get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; get_prefetch_limit(L) -> @@ -377,19 +355,10 @@ handle_call(unlimit_prefetch, _From, State) -> {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, volume = 0})}; -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call(unblock, _From, State) -> - {reply, ok, maybe_notify(State, State#lim{blocked = false})}; - handle_call(get_prefetch_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; -handle_call({can_send, QPid, _AckRequired}, _From, - State = #lim{blocked = true}) -> - {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case prefetch_limit_reached(State) of @@ -425,8 +394,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso - not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of + case prefetch_limit_reached(OldState) andalso + not prefetch_limit_reached(NewState) of true -> notify_queues(NewState); false -> NewState end. @@ -434,8 +403,6 @@ maybe_notify(OldState, NewState) -> prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -blocked(#lim{blocked = Blocked}) -> Blocked. - remember_queue(QPid, State = #lim{queues = Queues}) -> case orddict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4f50e1a503..9ce5afcb45 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7389f2c6df..58f6e728ed 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -286,8 +286,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> throw({become, F(Deb, Buf, BufLen, State)}); recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> - ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); + case rabbit_net:setopts(Sock, [{active, once}]) of + ok -> mainloop(Deb, Buf, BufLen, + State#v1{pending_recv = true}); + {error, Reason} -> stop(Reason, State) + end; recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), recvloop(Deb, [Rest], size(Rest), State1); @@ -313,11 +316,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> closed when State#v1.connection_state =:= closed -> ok; closed -> - maybe_emit_stats(State), - throw(connection_closed_abruptly); + stop(closed, State); {error, Reason} -> - maybe_emit_stats(State), - throw({inet_error, Reason}); + stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, ?MODULE, Deb, {Buf, BufLen, State}); @@ -328,6 +329,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. +stop(closed, State) -> maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> maybe_emit_stats(State), + throw({inet_error, Reason}). + handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> |
