summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl86
-rw-r--r--src/rabbit_channel.erl108
-rw-r--r--src/rabbit_event.erl111
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_reader.erl67
-rw-r--r--src/rabbit_variable_queue.erl56
6 files changed, 211 insertions, 219 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe1ddba02b..46f6674b04 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -118,19 +118,19 @@ init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = backing_queue_module(Q),
- backing_queue_state = undefined,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = undefined,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = dict:new()}, hibernate,
+ State = #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = backing_queue_module(Q),
+ backing_queue_state = undefined,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = dict:new()},
+ {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
@@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
- State = requeue_and_run(
- AckTags,
- process_args(
- #q{q = Q,
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
- backing_queue_state = BQS,
- active_consumers = queue:new(),
- expires = undefined,
- sync_timer_ref = undefined,
- rate_timer_ref = RateTRef,
- expiry_timer_ref = undefined,
- ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer(),
- msg_id_to_channel = MTC})),
+ State = #q{q = Q,
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ active_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = RateTRef,
+ expiry_timer_ref = undefined,
+ ttl = undefined,
+ msg_id_to_channel = MTC},
+ State1 = requeue_and_run(AckTags, process_args(
+ rabbit_event:init_stats_timer(
+ State, #q.stats_timer))),
lists:foldl(
fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end,
- State, Deliveries).
+ State1, Deliveries).
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
@@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-declare(Recover, From,
- State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined,
- stats_timer = StatsTimer}) ->
+declare(Recover, From, State = #q{q = Q,
+ backing_queue = BQ,
+ backing_queue_state = undefined}) ->
case rabbit_amqqueue:internal_declare(Q, Recover) of
not_found -> {stop, normal, not_found, State};
Q -> gen_server2:reply(From, {new, Q}),
@@ -199,7 +198,7 @@ declare(Recover, From,
State1 = process_args(State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(StatsTimer,
+ rabbit_event:if_enabled(State1, #q.stats_timer,
fun() -> emit_stats(State1) end),
noreply(State1);
Q1 -> {stop, normal, {existing, Q1}, State}
@@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) ->
State
end.
-ensure_stats_timer(State = #q{stats_timer = StatsTimer,
- q = #amqqueue{pid = QPid}}) ->
- State#q{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, QPid, emit_stats)}.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) ->
handle_info(drop_expired, State) ->
noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
-handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State) ->
%% Do not invoke noreply as it would see no timer and create a new one.
emit_stats(State),
- State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
+ State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer),
assert_invariant(State1),
{noreply, State1, hibernate};
@@ -1167,18 +1164,17 @@ handle_info(Info, State) ->
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
{hibernate, State};
handle_pre_hibernate(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- stats_timer = StatsTimer}) ->
+ backing_queue_state = BQS}) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
DesiredDuration =
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1),
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
- StatsTimer,
+ State, #q.stats_timer,
fun () -> emit_stats(State, [{idle_since, now()}]) end),
- State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
- backing_queue_state = BQS3},
+ State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
+ #q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3c61447af2..883e570ad6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,7 +34,7 @@
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
+ unacked_message_q, uncommitted_message_q, uncommitted_acks,
user, virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -173,7 +173,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
- StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -185,7 +184,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new(),
+ uncommitted_acks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -194,7 +193,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
blocking = sets:new(),
queue_consumers = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
@@ -202,10 +200,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
- rabbit_event:if_enabled(StatsTimer,
- fun() -> emit_stats(State) end),
- {ok, State, hibernate,
+ State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {ok, State1, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_call(Msg, _From, _State) ->
@@ -319,10 +318,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info(timeout, State) ->
noreply(State);
-handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
+handle_info(emit_stats, State) ->
emit_stats(State),
noreply([ensure_stats_timer],
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+ rabbit_event:reset_stats_timer(State, #ch.stats_timer));
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -335,12 +334,12 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
rabbit_event:if_enabled(
- StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end),
- StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State#ch{stats_timer = StatsTimer1}}.
+ State, #ch.stats_timer,
+ fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
terminate(Reason, State) ->
{Res, _State1} = notify_queues(State),
@@ -385,9 +384,8 @@ next_state(Mask, State) ->
State2 = ?MASKED_CALL(send_confirms, Mask, State1),
State2.
-ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
- State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, self(), emit_stats)}.
+ensure_stats_timer(State) ->
+ rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -669,15 +667,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ,
- tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
none -> ack(Acked, State1);
- in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
- State1#ch{uncommitted_ack_q = NewTAQ}
+ in_progress -> State1#ch{uncommitted_acks =
+ Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -839,6 +836,7 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
OkFun = fun () -> ok end,
+ UAMQL = queue:to_list(UAMQ),
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_misc:with_exit_handler(
@@ -846,8 +844,8 @@ handle_method(#'basic.recover_async'{requeue = true},
rabbit_amqqueue:requeue(
QPid, MsgIds, self())
end)
- end, ok, UAMQ),
- ok = notify_limiter(Limiter, UAMQ),
+ end, ok, UAMQL),
+ ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -1071,8 +1069,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_ack_q = TAQ}) ->
- State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ uncommitted_acks = TAL}) ->
+ State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
State, TMQ))),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
@@ -1080,10 +1078,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = TAQ}) ->
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
- queue:join(TAQ, UAMQ)})};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_acks = TAL}) ->
+ TAQ = queue:from_list(lists:reverse(TAL)),
+ {reply, #'tx.rollback_ok'{},
+ new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
@@ -1162,11 +1161,11 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
false -> State
end.
-queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
- queue_consumers = QCons,
+queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
blocking = Blocking,
- unconfirmed_qm = UQM}) ->
- StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
+ unconfirmed_qm = UQM} = State) ->
+ StatsEnabled = rabbit_event:stats_level(
+ State, #ch.stats_timer) =:= fine,
ConsumerMonitored = dict:is_key(QPid, QCons),
QueueBlocked = sets:is_element(QPid, Blocking),
ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
@@ -1282,18 +1281,18 @@ ack_record(DeliveryTag, ConsumerTag,
{DeliveryTag, ConsumerTag, {QPid, MsgId}}.
collect_acks(Q, 0, true) ->
- {Q, queue:new()};
+ {queue:to_list(Q), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
Multiple ->
- collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc,
+ collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
@@ -1314,7 +1313,7 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_ack_q = queue:new()}.
+ uncommitted_acks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1322,12 +1321,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) ->
{rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
State#ch{state = closing}}.
-fold_per_queue(F, Acc0, UAQ) ->
- T = rabbit_misc:queue_fold(
- fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAQ),
- rabbit_misc:gb_trees_fold(F, Acc0, T).
+fold_per_queue(_F, Acc, []) ->
+ Acc;
+fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId], Acc);
+fold_per_queue(F, Acc, UAL) ->
+ T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ end, gb_trees:empty(), UAL),
+ rabbit_misc:gb_trees_fold(F, Acc, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1349,9 +1351,9 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
- true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
+ true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
@@ -1493,8 +1495,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
- queue:len(TAQ);
+i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
+ length(TAL);
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1507,8 +1509,8 @@ maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_redeliver_stats(_, _, State) ->
State.
-maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) ->
- case rabbit_event:stats_level(StatsTimer) of
+maybe_incr_stats(QXIncs, Measure, State) ->
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
fine -> lists:foldl(fun ({QX, Inc}, State0) ->
incr_stats(QX, Inc, Measure, State0)
end, State, QXIncs);
@@ -1540,9 +1542,9 @@ update_measures(Type, QX, Inc, Measure) ->
emit_stats(State) ->
emit_stats(State, []).
-emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
+emit_stats(State, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
- case rabbit_event:stats_level(StatsTimer) of
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
coarse ->
rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
fine ->
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index bb765566cc..5ae40c784a 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -19,9 +19,9 @@
-include("rabbit.hrl").
-export([start_link/0]).
--export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]).
--export([reset_stats_timer/1]).
--export([stats_level/1, if_enabled/2]).
+-export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]).
+-export([reset_stats_timer/2]).
+-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -39,29 +39,23 @@
-type(event_timestamp() ::
{non_neg_integer(), non_neg_integer(), non_neg_integer()}).
--type(event() :: #event {
- type :: event_type(),
- props :: event_props(),
- timestamp :: event_timestamp()
- }).
+-type(event() :: #event { type :: event_type(),
+ props :: event_props(),
+ timestamp :: event_timestamp() }).
-type(level() :: 'none' | 'coarse' | 'fine').
--opaque(state() :: #state {
- level :: level(),
- interval :: integer(),
- timer :: atom()
- }).
-
-type(timer_fun() :: fun (() -> 'ok')).
+-type(container() :: tuple()).
+-type(pos() :: non_neg_integer()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(init_stats_timer/0 :: () -> state()).
--spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()).
--spec(stop_stats_timer/1 :: (state()) -> state()).
--spec(reset_stats_timer/1 :: (state()) -> state()).
--spec(stats_level/1 :: (state()) -> level()).
--spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
+-spec(init_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()).
+-spec(stop_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(reset_stats_timer/2 :: (container(), pos()) -> container()).
+-spec(stats_level/2 :: (container(), pos()) -> level()).
+-spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
@@ -75,58 +69,69 @@ start_link() ->
%% The idea is, for each stat-emitting object:
%%
%% On startup:
-%% Timer = init_stats_timer()
+%% init_stats_timer(State)
%% notify(created event)
%% if_enabled(internal_emit_stats) - so we immediately send something
%%
%% On wakeup:
-%% ensure_stats_timer(Timer, Pid, emit_stats)
+%% ensure_stats_timer(State, emit_stats)
%% (Note we can't emit stats immediately, the timer may have fired 1ms ago.)
%%
%% emit_stats:
%% if_enabled(internal_emit_stats)
-%% reset_stats_timer(Timer) - just bookkeeping
+%% reset_stats_timer(State) - just bookkeeping
%%
%% Pre-hibernation:
%% if_enabled(internal_emit_stats)
-%% stop_stats_timer(Timer)
+%% stop_stats_timer(State)
%%
%% internal_emit_stats:
%% notify(stats)
-init_stats_timer() ->
+init_stats_timer(C, P) ->
{ok, StatsLevel} = application:get_env(rabbit, collect_statistics),
{ok, Interval} = application:get_env(rabbit, collect_statistics_interval),
- #state{level = StatsLevel, interval = Interval, timer = undefined}.
-
-ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) ->
- State;
-ensure_stats_timer(State = #state{interval = Interval,
- timer = undefined}, Pid, Msg) ->
- TRef = erlang:send_after(Interval, Pid, Msg),
- State#state{timer = TRef};
-ensure_stats_timer(State, _Pid, _Msg) ->
- State.
-
-stop_stats_timer(State = #state{level = none}) ->
- State;
-stop_stats_timer(State = #state{timer = undefined}) ->
- State;
-stop_stats_timer(State = #state{timer = TRef}) ->
- erlang:cancel_timer(TRef),
- State#state{timer = undefined}.
-
-reset_stats_timer(State) ->
- State#state{timer = undefined}.
-
-stats_level(#state{level = Level}) ->
+ setelement(P, C, #state{level = StatsLevel, interval = Interval,
+ timer = undefined}).
+
+ensure_stats_timer(C, P, Msg) ->
+ case element(P, C) of
+ #state{level = Level, interval = Interval, timer = undefined} = State
+ when Level =/= none ->
+ TRef = erlang:send_after(Interval, self(), Msg),
+ setelement(P, C, State#state{timer = TRef});
+ #state{} ->
+ C
+ end.
+
+stop_stats_timer(C, P) ->
+ case element(P, C) of
+ #state{level = Level, timer = TRef} = State
+ when Level =/= none andalso TRef =/= undefined ->
+ erlang:cancel_timer(TRef),
+ setelement(P, C, State#state{timer = undefined});
+ #state{} ->
+ C
+ end.
+
+reset_stats_timer(C, P) ->
+ case element(P, C) of
+ #state{timer = TRef} = State
+ when TRef =/= undefined ->
+ setelement(P, C, State#state{timer = undefined});
+ #state{} ->
+ C
+ end.
+
+stats_level(C, P) ->
+ #state{level = Level} = element(P, C),
Level.
-if_enabled(#state{level = none}, _Fun) ->
- ok;
-if_enabled(_State, Fun) ->
- Fun(),
- ok.
+if_enabled(C, P, Fun) ->
+ case element(P, C) of
+ #state{level = none} -> ok;
+ #state{} -> Fun(), ok
+ end.
notify_if(true, Type, Props) -> notify(Type, Props);
notify_if(false, _Type, _Props) -> ok.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 5fc6341f50..328fe639f7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -95,7 +95,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
(case MNodes of
all -> rabbit_mnesia:all_clustered_nodes();
undefined -> [];
- _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes]
+ _ -> MNodes
end) -- [node()],
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b4871cefc1..b359f7d452 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -199,34 +199,32 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
+ State = #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ protocol = none,
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
+ client_properties = none,
+ capabilities = []},
+ callback = uninitialized_callback,
+ recv_len = 0,
+ pending_recv = false,
+ connection_state = pre_init,
+ queue_collector = Collector,
+ heartbeater = none,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = StartHeartbeatFun,
+ buf = [],
+ buf_len = 0,
+ auth_mechanism = none,
+ auth_state = none},
try
- recvloop(Deb, switch_callback(
- #v1{parent = Parent,
- sock = ClientSock,
- connection = #connection{
- protocol = none,
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
- client_properties = none,
- capabilities = []},
- callback = uninitialized_callback,
- recv_len = 0,
- pending_recv = false,
- connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
- rabbit_event:init_stats_timer(),
- channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun,
- buf = [],
- buf_len = 0,
- auth_mechanism = none,
- auth_state = none
- },
- handshake, 8))
+ recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
fun rabbit_log:warning/2;
@@ -605,10 +603,8 @@ refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
throw(Exception).
-ensure_stats_timer(State = #v1{stats_timer = StatsTimer,
- connection_state = running}) ->
- State#v1{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer, self(), emit_stats)};
+ensure_stats_timer(State = #v1{connection_state = running}) ->
+ rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.
@@ -695,8 +691,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
connection = Connection = #connection{
user = User,
protocol = Protocol},
- sock = Sock,
- stats_timer = StatsTimer}) ->
+ sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
@@ -707,7 +702,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(StatsTimer,
+ rabbit_event:if_enabled(State1, #v1.stats_timer,
fun() -> emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
@@ -937,6 +932,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}},
State1#v1.sock, 0, CloseMethod, Protocol),
State1.
-emit_stats(State = #v1{stats_timer = StatsTimer}) ->
+emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
- State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}.
+ rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8151d37a73..03004e102c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -493,9 +493,31 @@ purge(State = #vqstate { q4 = Q4,
ram_index_count = 0,
persistent_count = PCount1 })}.
-publish(Msg, MsgProps, _ChPid, State) ->
- {_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
- a(reduce_memory_use(State1)).
+publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
+ end,
+ PCount1 = PCount + one_if(IsPersistent1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = UC1 })).
publish_delivered(false, #basic_message { id = MsgId },
#message_properties { needs_confirming = NeedsConfirming },
@@ -1126,34 +1148,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, MsgOnDisk,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
- {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case ?QUEUE:is_empty(Q3) of
- false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
- end,
- PCount1 = PCount + one_if(IsPersistent1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- {SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1,
- unconfirmed = UC1 }}.
-
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
MsgStatus;