summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-21 17:55:37 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-21 17:55:37 +0000
commit7bced52c8cffa8b46cf383aad02dfbc0d99bdaaa (patch)
treee7815c7f1e5f04be5e7653e58089c955b9033fb2 /src
parentf7431d5f427441aa457cffcd202a25283233f8ee (diff)
parent4735326a076ac5f00d11118bd223a6079b7262e7 (diff)
downloadrabbitmq-server-git-7bced52c8cffa8b46cf383aad02dfbc0d99bdaaa.tar.gz
merge bug25962 into default
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl106
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_amqqueue.erl7
-rw-r--r--src/rabbit_basic.erl7
-rw-r--r--src/rabbit_channel.erl23
-rw-r--r--src/rabbit_error_logger_file_h.erl19
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_reader.erl18
8 files changed, 149 insertions, 66 deletions
diff --git a/src/gm.erl b/src/gm.erl
index df1c258d70..5a82950a41 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -395,6 +395,7 @@
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
+-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
@@ -414,6 +415,7 @@
callback_args,
confirms,
broadcast_buffer,
+ broadcast_buffer_sz,
broadcast_timer,
txn_executor
}).
@@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) ->
leave(Server) ->
gen_server2:cast(Server, leave).
-broadcast(Server, Msg) ->
- gen_server2:cast(Server, {broadcast, Msg}).
+broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
+
+broadcast(Server, Msg, SizeHint) ->
+ gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
@@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) ->
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = -1,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new(),
- broadcast_buffer = [],
- broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = -1,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, State);
+ internal_broadcast(Msg, From, 0, State);
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
noreply(State);
-handle_cast({broadcast, Msg},
+handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
@@ -650,8 +656,8 @@ handle_cast({broadcast, Msg},
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
-handle_cast({broadcast, Msg}, State) ->
- internal_broadcast(Msg, none, State);
+handle_cast({broadcast, Msg, SizeHint}, State) ->
+ internal_broadcast(Msg, none, SizeHint, State);
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- module = Module,
- confirms = Confirms,
- callback_args = Args,
- broadcast_buffer = Buffer }) ->
+internal_broadcast(Msg, From, SizeHint,
+ State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer,
+ broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount1, Msg} | Buffer],
@@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self,
none -> Confirms;
_ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1 },
- handle_callback_result({Result, case From of
- none -> State1;
- _ -> flush_broadcast_buffer(State1)
- end}).
+ State1 = State #state { pub_count = PubCount1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1,
+ broadcast_buffer_sz = BufferSize + SizeHint},
+ handle_callback_result(
+ {Result, case From of
+ none -> maybe_flush_broadcast_buffer(State1);
+ _ -> flush_broadcast_buffer(State1)
+ end}).
+
+%% The Erlang distribution mechanism has an interesting quirk - it
+%% will kill the VM cold with "Absurdly large distribution output data
+%% buffer" if you attempt to send a message which serialises out to
+%% more than 2^31 bytes in size. It's therefore a very good idea to
+%% make sure that we don't exceed that size!
+%%
+%% Now, we could figure out the size of messages as they come in using
+%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
+%% us to serialise the message only to throw the serialised form
+%% away. Hard to believe that's a sensible thing to do. So instead we
+%% accept a size hint from the application, via broadcast/3. This size
+%% hint can be the size of anything in the message which we expect
+%% could be large, and we just ignore the size of any small bits of
+%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
+%% conservatively at 100MB - but the buffer is only to allow us to
+%% buffer tiny messages anyway, so 100MB is plenty.
+
+maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
+ case Size > ?MAX_BUFFER_SIZE of
+ true -> flush_broadcast_buffer(State);
+ false -> State
+ end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
@@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self,
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
- State #state { members_state = MembersState1,
- broadcast_buffer = [] }.
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0}.
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index d54c2a8dba..19171659e3 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
lists:foldl(
- fun(Module, {refused, _, _}) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} ->
- {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else ->
- Else
+ fun ({ModN, ModZ}, {refused, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ
+ %% module, and use the #user{} the latter gives us.
+ case try_login(ModN, Username, AuthProps) of
+ {ok, _} -> try_login(ModZ, Username, []);
+ Else -> Else
end;
- (_, {ok, User}) ->
+ (Mod, {refused, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ try_login(Mod, Username, AuthProps);
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
{ok, User}
end, {refused, "No modules checked '~s'", [Username]}, Modules).
+try_login(Module, Username, AuthProps) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else -> Else
+ end.
+
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index aadf335c9b..c66c8981ad 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -113,10 +113,9 @@
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(consumers/1 ::
- (rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+-spec(consumers/1 :: (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536b2..3d70be4bc3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,9 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) ->
+ non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
+msg_size(#basic_message{content = Content}) -> msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b0010f9011..2d49b8b23b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -469,9 +469,8 @@ check_resource_access(User, Resource, Perm) ->
put(permission_cache, [V | CacheTail])
end.
-clear_permission_cache() ->
- erase(permission_cache),
- ok.
+clear_permission_cache() -> erase(permission_cache),
+ ok.
check_configure_permitted(Resource, #ch{user = User}) ->
check_resource_access(User, Resource, configure).
@@ -511,6 +510,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~B larger than max size ~B",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
qbin_to_resource(QueueNameBin, State) ->
name_to_resource(queue, QueueNameBin, State).
@@ -518,8 +525,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
rabbit_misc:r(VHostPath, Type, NameBin).
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
@@ -527,8 +533,7 @@ expand_queue_name_shortcut(QueueNameBin, _) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
@@ -654,6 +659,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -1614,8 +1620,7 @@ update_measures(Type, Key, Inc, Measure) ->
end,
put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
-emit_stats(State) ->
- emit_stats(State, []).
+emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
Coarse = infos(?STATISTICS_KEYS, State),
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index d59641b0d1..9421b52e83 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -79,6 +79,25 @@ init_file(File, PrevHandler) ->
%% filter out "application: foo; exited: stopped; type: temporary"
handle_event({info_report, _, {_, std_info, _}}, State) ->
{ok, State};
+%% When a node restarts quickly it is possible the rest of the cluster
+%% will not have had the chance to remove its queues from
+%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes
+%% on_node_down(node()). But before we get there we can receive lots
+%% of messages intended for the old version of the node. The emulator
+%% logs an event for every one of those messages; in extremis this can
+%% bring the server to its knees just logging "Discarding..."
+%% again and again. So just log the first one, then go silent.
+handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
+ State) ->
+ case get(discarding_message_seen) of
+ true -> {ok, State};
+ undefined -> put(discarding_message_seen, true),
+ error_logger_file_h:handle_event(Event, State)
+ end;
+%% Clear this state if we log anything else (but not a progress report).
+handle_event(Event = {info_msg, _, _}, State) ->
+ erase(discarding_message_seen),
+ error_logger_file_h:handle_event(Event, State);
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4f50e1a503..9ce5afcb45 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 8553e36d50..64debcab46 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -285,8 +285,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
- ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
+ {error, Reason} -> stop(Reason, State)
+ end;
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
@@ -312,11 +315,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
- maybe_emit_stats(State),
- throw(connection_closed_abruptly);
+ stop(closed, State);
{error, Reason} ->
- maybe_emit_stats(State),
- throw({inet_error, Reason});
+ stop(Reason, State);
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
?MODULE, Deb, {Buf, BufLen, State});
@@ -327,6 +328,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
end
end.
+stop(closed, State) -> maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+stop(Reason, State) -> maybe_emit_stats(State),
+ throw({inet_error, Reason}).
+
handle_other({conserve_resources, Source, Conserve},
State = #v1{throttle = Throttle =
#throttle{alarmed_by = CR}}) ->