diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-21 17:55:37 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-21 17:55:37 +0000 |
| commit | 7bced52c8cffa8b46cf383aad02dfbc0d99bdaaa (patch) | |
| tree | e7815c7f1e5f04be5e7653e58089c955b9033fb2 /src | |
| parent | f7431d5f427441aa457cffcd202a25283233f8ee (diff) | |
| parent | 4735326a076ac5f00d11118bd223a6079b7262e7 (diff) | |
| download | rabbitmq-server-git-7bced52c8cffa8b46cf383aad02dfbc0d99bdaaa.tar.gz | |
merge bug25962 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 18 |
8 files changed, 149 insertions, 66 deletions
diff --git a/src/gm.erl b/src/gm.erl index df1c258d70..5a82950a41 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -382,7 +382,7 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, +-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -395,6 +395,7 @@ -export([table_definitions/0]). -define(GROUP_TABLE, gm_group). +-define(MAX_BUFFER_SIZE, 100000000). %% 100MB -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). @@ -414,6 +415,7 @@ callback_args, confirms, broadcast_buffer, + broadcast_buffer_sz, broadcast_timer, txn_executor }). @@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) -> leave(Server) -> gen_server2:cast(Server, leave). -broadcast(Server, Msg) -> - gen_server2:cast(Server, {broadcast, Msg}). +broadcast(Server, Msg) -> broadcast(Server, Msg, 0). + +broadcast(Server, Msg, SizeHint) -> + gen_server2:cast(Server, {broadcast, Msg, SizeHint}). confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). @@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) -> random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), gen_server2:cast(self(), join), - {ok, #state { self = Self, - left = {Self, undefined}, - right = {Self, undefined}, - group_name = GroupName, - module = Module, - view = undefined, - pub_count = -1, - members_state = undefined, - callback_args = Args, - confirms = queue:new(), - broadcast_buffer = [], - broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + {ok, #state { self = Self, + left = {Self, undefined}, + right = {Self, undefined}, + group_name = GroupName, + module = Module, + view = undefined, + pub_count = -1, + members_state = undefined, + callback_args = Args, + confirms = queue:new(), + broadcast_buffer = [], + broadcast_buffer_sz = 0, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, State); + internal_broadcast(Msg, From, 0, State); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); -handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> noreply(State); -handle_cast({broadcast, Msg}, +handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, module = Module, @@ -650,8 +656,8 @@ handle_cast({broadcast, Msg}, handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), State}); -handle_cast({broadcast, Msg}, State) -> - internal_broadcast(Msg, none, State); +handle_cast({broadcast, Msg, SizeHint}, State) -> + internal_broadcast(Msg, none, SizeHint, State); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, State = #state { self = Self, - pub_count = PubCount, - module = Module, - confirms = Confirms, - callback_args = Args, - broadcast_buffer = Buffer }) -> +internal_broadcast(Msg, From, SizeHint, + State = #state { self = Self, + pub_count = PubCount, + module = Module, + confirms = Confirms, + callback_args = Args, + broadcast_buffer = Buffer, + broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount1, Msg} | Buffer], @@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self, none -> Confirms; _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1 }, - handle_callback_result({Result, case From of - none -> State1; - _ -> flush_broadcast_buffer(State1) - end}). + State1 = State #state { pub_count = PubCount1, + confirms = Confirms1, + broadcast_buffer = Buffer1, + broadcast_buffer_sz = BufferSize + SizeHint}, + handle_callback_result( + {Result, case From of + none -> maybe_flush_broadcast_buffer(State1); + _ -> flush_broadcast_buffer(State1) + end}). + +%% The Erlang distribution mechanism has an interesting quirk - it +%% will kill the VM cold with "Absurdly large distribution output data +%% buffer" if you attempt to send a message which serialises out to +%% more than 2^31 bytes in size. It's therefore a very good idea to +%% make sure that we don't exceed that size! +%% +%% Now, we could figure out the size of messages as they come in using +%% size(term_to_binary(Msg)) or similar. The trouble is, that requires +%% us to serialise the message only to throw the serialised form +%% away. Hard to believe that's a sensible thing to do. So instead we +%% accept a size hint from the application, via broadcast/3. This size +%% hint can be the size of anything in the message which we expect +%% could be large, and we just ignore the size of any small bits of +%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat +%% conservatively at 100MB - but the buffer is only to allow us to +%% buffer tiny messages anyway, so 100MB is plenty. + +maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) -> + case Size > ?MAX_BUFFER_SIZE of + true -> flush_broadcast_buffer(State); + false -> State + end. flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; @@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self, Member #member { pending_ack = PA1, last_pub = PubCount } end, Self, MembersState), - State #state { members_state = MembersState1, - broadcast_buffer = [] }. + State #state { members_state = MembersState1, + broadcast_buffer = [], + broadcast_buffer_sz = 0}. %% --------------------------------------------------------------------------- diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d54c2a8dba..19171659e3 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) -> check_user_login(Username, AuthProps) -> {ok, Modules} = application:get_env(rabbit, auth_backends), lists:foldl( - fun(Module, {refused, _, _}) -> - case Module:check_user_login(Username, AuthProps) of - {error, E} -> - {refused, "~s failed authenticating ~s: ~p~n", - [Module, Username, E]}; - Else -> - Else + fun ({ModN, ModZ}, {refused, _, _}) -> + %% Different modules for authN vs authZ. So authenticate + %% with authN module, then if that succeeds do + %% passwordless (i.e pre-authenticated) login with authZ + %% module, and use the #user{} the latter gives us. + case try_login(ModN, Username, AuthProps) of + {ok, _} -> try_login(ModZ, Username, []); + Else -> Else end; - (_, {ok, User}) -> + (Mod, {refused, _, _}) -> + %% Same module for authN and authZ. Just take the result + %% it gives us + try_login(Mod, Username, AuthProps); + (_, {ok, User}) -> + %% We've successfully authenticated. Skip to the end... {ok, User} end, {refused, "No modules checked '~s'", [Username]}, Modules). +try_login(Module, Username, AuthProps) -> + case Module:check_user_login(Username, AuthProps) of + {error, E} -> {refused, "~s failed authenticating ~s: ~p~n", + [Module, Username, E]}; + Else -> Else + end. + check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index aadf335c9b..c66c8981ad 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -113,10 +113,9 @@ -> [rabbit_types:infos()]). -spec(force_event_refresh/0 :: () -> 'ok'). -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). --spec(consumers/1 :: - (rabbit_types:amqqueue()) - -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). +-spec(consumers/1 :: (rabbit_types:amqqueue()) + -> [{pid(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2e825536b2..3d70be4bc3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -22,7 +22,7 @@ message/3, message/4, properties/1, prepend_table_header/3, extract_headers/1, map_headers/2, delivery/3, header_routes/1, parse_expiration/1]). --export([build_content/2, from_content/1]). +-export([build_content/2, from_content/1, msg_size/1]). %%---------------------------------------------------------------------------- @@ -77,6 +77,9 @@ (rabbit_framing:amqp_property_record()) -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). +-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) -> + non_neg_integer()). + -endif. %%---------------------------------------------------------------------------- @@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) -> {error, {leftover_string, S}} end. +msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR); +msg_size(#basic_message{content = Content}) -> msg_size(Content). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b0010f9011..2d49b8b23b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -469,9 +469,8 @@ check_resource_access(User, Resource, Perm) -> put(permission_cache, [V | CacheTail]) end. -clear_permission_cache() -> - erase(permission_cache), - ok. +clear_permission_cache() -> erase(permission_cache), + ok. check_configure_permitted(Resource, #ch{user = User}) -> check_resource_access(User, Resource, configure). @@ -511,6 +510,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +check_msg_size(Content) -> + Size = rabbit_basic:msg_size(Content), + case Size > ?MAX_MSG_SIZE of + true -> precondition_failed("message size ~B larger than max size ~B", + [Size, ?MAX_MSG_SIZE]); + false -> ok + end. + qbin_to_resource(QueueNameBin, State) -> name_to_resource(queue, QueueNameBin, State). @@ -518,8 +525,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, Type, NameBin). expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> @@ -527,8 +533,7 @@ expand_queue_name_shortcut(QueueNameBin, _) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> - rabbit_misc:protocol_error( - not_found, "no previously declared queue", []); + rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; @@ -654,6 +659,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, tx = Tx, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> + check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -1614,8 +1620,7 @@ update_measures(Type, Key, Inc, Measure) -> end, put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)). -emit_stats(State) -> - emit_stats(State, []). +emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> Coarse = infos(?STATISTICS_KEYS, State), diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl index d59641b0d1..9421b52e83 100644 --- a/src/rabbit_error_logger_file_h.erl +++ b/src/rabbit_error_logger_file_h.erl @@ -79,6 +79,25 @@ init_file(File, PrevHandler) -> %% filter out "application: foo; exited: stopped; type: temporary" handle_event({info_report, _, {_, std_info, _}}, State) -> {ok, State}; +%% When a node restarts quickly it is possible the rest of the cluster +%% will not have had the chance to remove its queues from +%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes +%% on_node_down(node()). But before we get there we can receive lots +%% of messages intended for the old version of the node. The emulator +%% logs an event for every one of those messages; in extremis this can +%% bring the server to its knees just logging "Discarding..." +%% again and again. So just log the first one, then go silent. +handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}}, + State) -> + case get(discarding_message_seen) of + true -> {ok, State}; + undefined -> put(discarding_message_seen, true), + error_logger_file_h:handle_event(Event, State) + end; +%% Clear this state if we log anything else (but not a progress report). +handle_event(Event = {info_msg, _, _}, State) -> + erase(discarding_message_seen), + error_logger_file_h:handle_event(Event, State); handle_event(Event, State) -> error_logger_file_h:handle_event(Event, State). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4f50e1a503..9ce5afcb45 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + rabbit_basic:msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 8553e36d50..64debcab46 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -285,8 +285,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> throw({become, F(Deb, Buf, BufLen, State)}); recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> - ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); + case rabbit_net:setopts(Sock, [{active, once}]) of + ok -> mainloop(Deb, Buf, BufLen, + State#v1{pending_recv = true}); + {error, Reason} -> stop(Reason, State) + end; recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), recvloop(Deb, [Rest], size(Rest), State1); @@ -312,11 +315,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> closed when State#v1.connection_state =:= closed -> ok; closed -> - maybe_emit_stats(State), - throw(connection_closed_abruptly); + stop(closed, State); {error, Reason} -> - maybe_emit_stats(State), - throw({inet_error, Reason}); + stop(Reason, State); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, ?MODULE, Deb, {Buf, BufLen, State}); @@ -327,6 +328,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> end end. +stop(closed, State) -> maybe_emit_stats(State), + throw(connection_closed_abruptly); +stop(Reason, State) -> maybe_emit_stats(State), + throw({inet_error, Reason}). + handle_other({conserve_resources, Source, Conserve}, State = #v1{throttle = Throttle = #throttle{alarmed_by = CR}}) -> |
