summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 05:33:08 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 05:33:08 +0000
commit0363ebd8e391931635725a23c9372e6752bcf52b (patch)
tree5a8c220b1044348996d89aa5cdcb6f5016f98768 /src
parent6a07ff8c1d76b0bb4d4c807d0c65968656d73c10 (diff)
parentfa4968970f6160afdd64cceb07021a669e51f57c (diff)
downloadrabbitmq-server-git-0363ebd8e391931635725a23c9372e6752bcf52b.tar.gz
merge default into bug24297
(and resolve merge conflicts in limiter)
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl106
-rw-r--r--src/rabbit_access_control.erl29
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_basic.erl7
-rw-r--r--src/rabbit_channel.erl111
-rw-r--r--src/rabbit_error_logger_file_h.erl19
-rw-r--r--src/rabbit_limiter.erl67
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_reader.erl18
10 files changed, 183 insertions, 196 deletions
diff --git a/src/gm.erl b/src/gm.erl
index df1c258d70..5a82950a41 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -395,6 +395,7 @@
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
+-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
@@ -414,6 +415,7 @@
callback_args,
confirms,
broadcast_buffer,
+ broadcast_buffer_sz,
broadcast_timer,
txn_executor
}).
@@ -522,8 +524,10 @@ start_link(GroupName, Module, Args, TxnFun) ->
leave(Server) ->
gen_server2:cast(Server, leave).
-broadcast(Server, Msg) ->
- gen_server2:cast(Server, {broadcast, Msg}).
+broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
+
+broadcast(Server, Msg, SizeHint) ->
+ gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
@@ -547,19 +551,20 @@ init([GroupName, Module, Args, TxnFun]) ->
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
- {ok, #state { self = Self,
- left = {Self, undefined},
- right = {Self, undefined},
- group_name = GroupName,
- module = Module,
- view = undefined,
- pub_count = -1,
- members_state = undefined,
- callback_args = Args,
- confirms = queue:new(),
- broadcast_buffer = [],
- broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ {ok, #state { self = Self,
+ left = {Self, undefined},
+ right = {Self, undefined},
+ group_name = GroupName,
+ module = Module,
+ view = undefined,
+ pub_count = -1,
+ members_state = undefined,
+ callback_args = Args,
+ confirms = queue:new(),
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -576,7 +581,7 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, State);
+ internal_broadcast(Msg, From, 0, State);
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -639,10 +644,11 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
-handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
noreply(State);
-handle_cast({broadcast, Msg},
+handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
@@ -650,8 +656,8 @@ handle_cast({broadcast, Msg},
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
-handle_cast({broadcast, Msg}, State) ->
- internal_broadcast(Msg, none, State);
+handle_cast({broadcast, Msg, SizeHint}, State) ->
+ internal_broadcast(Msg, none, SizeHint, State);
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -883,12 +889,14 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, State = #state { self = Self,
- pub_count = PubCount,
- module = Module,
- confirms = Confirms,
- callback_args = Args,
- broadcast_buffer = Buffer }) ->
+internal_broadcast(Msg, From, SizeHint,
+ State = #state { self = Self,
+ pub_count = PubCount,
+ module = Module,
+ confirms = Confirms,
+ callback_args = Args,
+ broadcast_buffer = Buffer,
+ broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount1, Msg} | Buffer],
@@ -896,13 +904,38 @@ internal_broadcast(Msg, From, State = #state { self = Self,
none -> Confirms;
_ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1 },
- handle_callback_result({Result, case From of
- none -> State1;
- _ -> flush_broadcast_buffer(State1)
- end}).
+ State1 = State #state { pub_count = PubCount1,
+ confirms = Confirms1,
+ broadcast_buffer = Buffer1,
+ broadcast_buffer_sz = BufferSize + SizeHint},
+ handle_callback_result(
+ {Result, case From of
+ none -> maybe_flush_broadcast_buffer(State1);
+ _ -> flush_broadcast_buffer(State1)
+ end}).
+
+%% The Erlang distribution mechanism has an interesting quirk - it
+%% will kill the VM cold with "Absurdly large distribution output data
+%% buffer" if you attempt to send a message which serialises out to
+%% more than 2^31 bytes in size. It's therefore a very good idea to
+%% make sure that we don't exceed that size!
+%%
+%% Now, we could figure out the size of messages as they come in using
+%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
+%% us to serialise the message only to throw the serialised form
+%% away. Hard to believe that's a sensible thing to do. So instead we
+%% accept a size hint from the application, via broadcast/3. This size
+%% hint can be the size of anything in the message which we expect
+%% could be large, and we just ignore the size of any small bits of
+%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
+%% conservatively at 100MB - but the buffer is only to allow us to
+%% buffer tiny messages anyway, so 100MB is plenty.
+
+maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
+ case Size > ?MAX_BUFFER_SIZE of
+ true -> flush_broadcast_buffer(State);
+ false -> State
+ end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
@@ -920,8 +953,9 @@ flush_broadcast_buffer(State = #state { self = Self,
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
- State #state { members_state = MembersState1,
- broadcast_buffer = [] }.
+ State #state { members_state = MembersState1,
+ broadcast_buffer = [],
+ broadcast_buffer_sz = 0}.
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index d54c2a8dba..19171659e3 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -52,18 +52,31 @@ check_user_pass_login(Username, Password) ->
check_user_login(Username, AuthProps) ->
{ok, Modules} = application:get_env(rabbit, auth_backends),
lists:foldl(
- fun(Module, {refused, _, _}) ->
- case Module:check_user_login(Username, AuthProps) of
- {error, E} ->
- {refused, "~s failed authenticating ~s: ~p~n",
- [Module, Username, E]};
- Else ->
- Else
+ fun ({ModN, ModZ}, {refused, _, _}) ->
+ %% Different modules for authN vs authZ. So authenticate
+ %% with authN module, then if that succeeds do
+ %% passwordless (i.e pre-authenticated) login with authZ
+ %% module, and use the #user{} the latter gives us.
+ case try_login(ModN, Username, AuthProps) of
+ {ok, _} -> try_login(ModZ, Username, []);
+ Else -> Else
end;
- (_, {ok, User}) ->
+ (Mod, {refused, _, _}) ->
+ %% Same module for authN and authZ. Just take the result
+ %% it gives us
+ try_login(Mod, Username, AuthProps);
+ (_, {ok, User}) ->
+ %% We've successfully authenticated. Skip to the end...
{ok, User}
end, {refused, "No modules checked '~s'", [Username]}, Modules).
+try_login(Module, Username, AuthProps) ->
+ case Module:check_user_login(Username, AuthProps) of
+ {error, E} -> {refused, "~s failed authenticating ~s: ~p~n",
+ [Module, Username, E]};
+ Else -> Else
+ end.
+
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index cf94e61cfa..76e11dffb2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -27,7 +27,7 @@
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
--export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
@@ -113,10 +113,9 @@
-> [rabbit_types:infos()]).
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
--spec(consumers/1 ::
- (rabbit_types:amqqueue())
- -> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+-spec(consumers/1 :: (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
@@ -162,7 +161,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(resume/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -607,8 +605,6 @@ notify_sent_queue_down(QPid) ->
resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
-flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
-
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
%% this 'guarded' delete prevents unnecessary writes to the mnesia
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 801c48d11f..b4355287f9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1083,10 +1083,6 @@ handle_cast({activate_limit, ChPid}, State) ->
noreply(possibly_unblock(rabbit_queue_consumers:activate_limit_fun(),
ChPid, State));
-handle_cast({flush, ChPid}, State) ->
- ok = rabbit_channel:flushed(ChPid, self()),
- noreply(State);
-
handle_cast({set_ram_duration_target, Duration},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2e825536b2..3d70be4bc3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -22,7 +22,7 @@
message/3, message/4, properties/1, prepend_table_header/3,
extract_headers/1, map_headers/2, delivery/3, header_routes/1,
parse_expiration/1]).
--export([build_content/2, from_content/1]).
+-export([build_content/2, from_content/1, msg_size/1]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,9 @@
(rabbit_framing:amqp_property_record())
-> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
+-spec(msg_size/1 :: (rabbit_types:content() | rabbit_types:message()) ->
+ non_neg_integer()).
+
-endif.
%%----------------------------------------------------------------------------
@@ -274,3 +277,5 @@ parse_expiration(#'P_basic'{expiration = Expiration}) ->
{error, {leftover_string, S}}
end.
+msg_size(#content{payload_fragments_rev = PFR}) -> iolist_size(PFR);
+msg_size(#basic_message{content = Content}) -> msg_size(Content).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da2ba2676c..e030096f5e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,8 +21,7 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2,
- flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -37,7 +36,7 @@
conn_name, limiter, tx, next_tag, unacked_message_q, user,
virtual_host, most_recently_declared_queue,
queue_names, queue_monitors, consumer_mapping,
- blocking, queue_consumers, delivering_queues,
+ queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -53,7 +52,6 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked,
state]).
-define(CREATION_EVENT_KEYS,
@@ -99,7 +97,6 @@
-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
-> 'ok').
--spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -149,9 +146,6 @@ send_credit_reply(Pid, Len) ->
send_drained(Pid, CTagCredit) ->
gen_server2:cast(Pid, {send_drained, CTagCredit}).
-flushed(Pid, QPid) ->
- gen_server2:cast(Pid, {flushed, QPid}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_channel, list_local, []).
@@ -213,7 +207,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
- blocking = sets:new(),
queue_consumers = dict:new(),
delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
@@ -291,9 +284,6 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast({flushed, QPid}, State) ->
- {noreply, queue_blocked(QPid, State), hibernate};
-
handle_cast(ready_for_close, State = #ch{state = closing,
writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
@@ -368,8 +358,7 @@ handle_info(emit_stats, State) ->
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
- State2 = queue_blocked(QPid, State1),
- State3 = handle_consuming_queue_down(QPid, State2),
+ State3 = handle_consuming_queue_down(QPid, State1),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
#ch{queue_names = QNames, queue_monitors = QMons} = State4,
@@ -480,9 +469,8 @@ check_resource_access(User, Resource, Perm) ->
put(permission_cache, [V | CacheTail])
end.
-clear_permission_cache() ->
- erase(permission_cache),
- ok.
+clear_permission_cache() -> erase(permission_cache),
+ ok.
check_configure_permitted(Resource, #ch{user = User}) ->
check_resource_access(User, Resource, configure).
@@ -522,6 +510,14 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+check_msg_size(Content) ->
+ Size = rabbit_basic:msg_size(Content),
+ case Size > ?MAX_MSG_SIZE of
+ true -> precondition_failed("message size ~B larger than max size ~B",
+ [Size, ?MAX_MSG_SIZE]);
+ false -> ok
+ end.
+
qbin_to_resource(QueueNameBin, State) ->
name_to_resource(queue, QueueNameBin, State).
@@ -529,8 +525,7 @@ name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
rabbit_misc:r(VHostPath, Type, NameBin).
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
@@ -538,8 +533,7 @@ expand_queue_name_shortcut(QueueNameBin, _) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(
- not_found, "no previously declared queue", []);
+ rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = MRDQ}) ->
MRDQ;
@@ -593,20 +587,6 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
-queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
- case sets:is_element(QPid, Blocking) of
- false -> State;
- true -> maybe_send_flow_ok(
- State#ch{blocking = sets:del_element(QPid, Blocking)})
- end.
-
-maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
- case sets:size(Blocking) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State.
-
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -679,6 +659,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
+ check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -873,8 +854,13 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
%% unacked messages from basic.get too. Pretty obscure though.
Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
PrefetchCount, queue:len(UAMQ)),
- {reply, #'basic.qos_ok'{},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
+ case ((not rabbit_limiter:is_active(Limiter)) andalso
+ rabbit_limiter:is_active(Limiter1)) of
+ true -> rabbit_amqqueue:activate_limit_all(
+ consumer_queues(State#ch.consumer_mapping), self());
+ false -> ok
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
@@ -1165,36 +1151,11 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true},
- _, State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unblock(Limiter),
- {reply, #'channel.flow_ok'{active = true},
- maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
-
-handle_method(#'channel.flow'{active = false},
- _, State = #ch{consumer_mapping = Consumers,
- limiter = Limiter}) ->
- case rabbit_limiter:is_blocked(Limiter) of
- true -> {noreply, maybe_send_flow_ok(State)};
- false -> Limiter1 = rabbit_limiter:block(Limiter),
- State1 = maybe_limit_queues(Limiter, Limiter1,
- State#ch{limiter = Limiter1}),
- %% The semantics of channel.flow{active=false}
- %% require that no messages are delivered after the
- %% channel.flow_ok has been sent. We accomplish that
- %% by "flushing" all messages in flight from the
- %% consumer queues to us. To do this we tell all the
- %% queues to invoke rabbit_channel:flushed/2, which
- %% will send us a {flushed, ...} message that appears
- %% *after* all the {deliver, ...} messages. We keep
- %% track of all the QPids thus asked, and once all of
- %% them have responded (or died) we send the
- %% channel.flow_ok.
- QPids = consumer_queues(Consumers),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, maybe_send_flow_ok(
- State1#ch{blocking = sets:from_list(QPids)})}
- end;
+handle_method(#'channel.flow'{active = true}, _, State) ->
+ {reply, #'channel.flow_ok'{active = true}, State};
+
+handle_method(#'channel.flow'{active = false}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "active=false", []);
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
@@ -1446,15 +1407,6 @@ foreach_per_consumer(F, UAL) ->
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
-maybe_limit_queues(OldLimiter, NewLimiter, State) ->
- case ((not rabbit_limiter:is_active(OldLimiter)) andalso
- rabbit_limiter:is_active(NewLimiter)) of
- true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:activate_limit_all(Queues, self());
- false -> ok
- end,
- State.
-
consumer_queues(Consumers) ->
lists:usort([QPid ||
{_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
@@ -1466,7 +1418,7 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
%% optimisation: avoid the potentially expensive 'foldl' in the
%% common case.
- case rabbit_limiter:is_prefetch_limited(Limiter) of
+ case rabbit_limiter:is_active(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1634,8 +1586,6 @@ i(state, #ch{state = running}) -> credit_flow:state();
i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
-i(client_flow_blocked, #ch{limiter = Limiter}) ->
- rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
@@ -1656,8 +1606,7 @@ update_measures(Type, Key, Inc, Measure) ->
end,
put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
-emit_stats(State) ->
- emit_stats(State, []).
+emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
Coarse = infos(?STATISTICS_KEYS, State),
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index d59641b0d1..9421b52e83 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -79,6 +79,25 @@ init_file(File, PrevHandler) ->
%% filter out "application: foo; exited: stopped; type: temporary"
handle_event({info_report, _, {_, std_info, _}}, State) ->
{ok, State};
+%% When a node restarts quickly it is possible the rest of the cluster
+%% will not have had the chance to remove its queues from
+%% Mnesia. That's why rabbit_amqqueue:recover/0 invokes
+%% on_node_down(node()). But before we get there we can receive lots
+%% of messages intended for the old version of the node. The emulator
+%% logs an event for every one of those messages; in extremis this can
+%% bring the server to its knees just logging "Discarding..."
+%% again and again. So just log the first one, then go silent.
+handle_event(Event = {error, _, {emulator, _, ["Discarding message" ++ _]}},
+ State) ->
+ case get(discarding_message_seen) of
+ true -> {ok, State};
+ undefined -> put(discarding_message_seen, true),
+ error_logger_file_h:handle_event(Event, State)
+ end;
+%% Clear this state if we log anything else (but not a progress report).
+handle_event(Event = {info_msg, _, _}, State) ->
+ erase(discarding_message_seen),
+ error_logger_file_h:handle_event(Event, State);
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 5dcd12b8e9..8d92ef9c46 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,8 +17,8 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count, channel.flow, and our consumer prefetch extension,
-%% and AMQP 1.0's link (aka consumer) credit mechanism.
+%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's
+%% link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -69,11 +69,9 @@
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
-%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
-%% about - and channel.flow blocking - that's what block/1,
-%% unblock/1 and is_blocked/1 are for. They also tell the limiter
-%% queue state (via the queue) about consumer credit changes and
-%% messaeg acknowledgement - that's what credit/5 and
+%% get_prefetch_limit/1 API functions are about. They also tell the
+%% limiter queue state (via the queue) about consumer credit
+%% changes and message acknowledgement - that's what credit/5 and
%% ack_from_queue/3 are for.
%%
%% 2. Queues also tell the limiter queue state about the queue
@@ -88,12 +86,11 @@
%%
%% 5. Queues ask the limiter for permission (with can_send/3) whenever
%% they want to deliver a message to a channel. The limiter checks
-%% whether a) the channel isn't blocked by channel.flow, b) the
-%% volume has not yet reached the prefetch limit, and c) whether
-%% the consumer has enough credit. If so it increments the volume
-%% and tells the queue to proceed. Otherwise it marks the queue as
-%% requiring notification (see below) and tells the queue not to
-%% proceed.
+%% whether a) the volume has not yet reached the prefetch limit,
+%% and b) whether the consumer has enough credit. If so it
+%% increments the volume and tells the queue to proceed. Otherwise
+%% it marks the queue as requiring notification (see below) and
+%% tells the queue not to proceed.
%%
%% 6. A queue that has been told to proceed (by the return value of
%% can_send/3) sends the message to the channel. Conversely, a
@@ -128,8 +125,7 @@
-export([start_link/1]).
%% channel API
--export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
- is_prefetch_limited/1, is_blocked/1, is_active/1,
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, is_active/1,
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
@@ -141,14 +137,13 @@
%%----------------------------------------------------------------------------
--record(lstate, {pid, prefetch_limited, blocked}).
+-record(lstate, {pid, prefetch_limited}).
-record(qstate, {pid, state, credits}).
-ifdef(use_specs).
-type(lstate() :: #lstate{pid :: pid(),
- prefetch_limited :: boolean(),
- blocked :: boolean()}).
+ prefetch_limited :: boolean()}).
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
@@ -161,10 +156,6 @@
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
-> lstate()).
-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
--spec(block/1 :: (lstate()) -> lstate()).
--spec(unblock/1 :: (lstate()) -> lstate()).
--spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
--spec(is_blocked/1 :: (lstate()) -> boolean()).
-spec(is_active/1 :: (lstate()) -> boolean()).
-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
@@ -192,7 +183,6 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -210,7 +200,7 @@ start_link(ProcName) -> gen_server2:start_link(?MODULE, [ProcName], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}, infinity),
- #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
+ #lstate{pid = Pid, prefetch_limited = false}.
limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
ok = gen_server:call(
@@ -222,19 +212,7 @@ unlimit_prefetch(L) ->
ok = gen_server:call(L#lstate.pid, unlimit_prefetch, infinity),
L#lstate{prefetch_limited = false}.
-block(L) ->
- ok = gen_server:call(L#lstate.pid, block, infinity),
- L#lstate{blocked = true}.
-
-unblock(L) ->
- ok = gen_server:call(L#lstate.pid, unblock, infinity),
- L#lstate{blocked = false}.
-
-is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
-
-is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-
-is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
+is_active(#lstate{prefetch_limited = Limited}) -> Limited.
get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
get_prefetch_limit(L) ->
@@ -377,19 +355,10 @@ handle_call(unlimit_prefetch, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
volume = 0})};
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call(unblock, _From, State) ->
- {reply, ok, maybe_notify(State, State#lim{blocked = false})};
-
handle_call(get_prefetch_limit, _From,
State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
-handle_call({can_send, QPid, _AckRequired}, _From,
- State = #lim{blocked = true}) ->
- {reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case prefetch_limit_reached(State) of
@@ -425,8 +394,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
- not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
+ case prefetch_limit_reached(OldState) andalso
+ not prefetch_limit_reached(NewState) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -434,8 +403,6 @@ maybe_notify(OldState, NewState) ->
prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-blocked(#lim{blocked = Blocked}) -> Blocked.
-
remember_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4f50e1a503..9ce5afcb45 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,8 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +223,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7389f2c6df..58f6e728ed 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -286,8 +286,11 @@ recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
- ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+ case rabbit_net:setopts(Sock, [{active, once}]) of
+ ok -> mainloop(Deb, Buf, BufLen,
+ State#v1{pending_recv = true});
+ {error, Reason} -> stop(Reason, State)
+ end;
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
@@ -313,11 +316,9 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
- maybe_emit_stats(State),
- throw(connection_closed_abruptly);
+ stop(closed, State);
{error, Reason} ->
- maybe_emit_stats(State),
- throw({inet_error, Reason});
+ stop(Reason, State);
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
?MODULE, Deb, {Buf, BufLen, State});
@@ -328,6 +329,11 @@ mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
end
end.
+stop(closed, State) -> maybe_emit_stats(State),
+ throw(connection_closed_abruptly);
+stop(Reason, State) -> maybe_emit_stats(State),
+ throw({inet_error, Reason}).
+
handle_other({conserve_resources, Source, Conserve},
State = #v1{throttle = Throttle =
#throttle{alarmed_by = CR}}) ->