diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 13:40:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-10-11 13:40:29 +0100 |
| commit | 3f3b79aa89a937f6c351cd29541060dc704ef37e (patch) | |
| tree | f09e204a60587c32cb0cd03afbbf50e443dbb740 /src | |
| parent | f04c2946b50aa7f9c0eb4becbcdea726f00f120b (diff) | |
| parent | f88351e1078750fcfc5f8b8a66bda5a3b1aece12 (diff) | |
| download | rabbitmq-server-git-3f3b79aa89a937f6c351cd29541060dc704ef37e.tar.gz | |
Merging default to bug24455
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_event.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 56 |
6 files changed, 211 insertions, 219 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe1ddba02b..46f6674b04 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -118,19 +118,19 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + State = #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = dict:new()}, + {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, @@ -140,25 +140,24 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, none -> ok; _ -> erlang:monitor(process, Owner) end, - State = requeue_and_run( - AckTags, - process_args( - #q{q = Q, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = BQ, - backing_queue_state = BQS, - active_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = RateTRef, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = MTC})), + State = #q{q = Q, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + msg_id_to_channel = MTC}, + State1 = requeue_and_run(AckTags, process_args( + rabbit_event:init_stats_timer( + State, #q.stats_timer))), lists:foldl( fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, - State, Deliveries). + State1, Deliveries). terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); @@ -183,9 +182,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -declare(Recover, From, - State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, - stats_timer = StatsTimer}) -> +declare(Recover, From, State = #q{q = Q, + backing_queue = BQ, + backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; Q -> gen_server2:reply(From, {new, Q}), @@ -199,7 +198,7 @@ declare(Recover, From, State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), - rabbit_event:if_enabled(StatsTimer, + rabbit_event:if_enabled(State1, #q.stats_timer, fun() -> emit_stats(State1) end), noreply(State1); Q1 -> {stop, normal, {existing, Q1}, State} @@ -315,10 +314,8 @@ ensure_expiry_timer(State = #q{expires = Expires}) -> State end. -ensure_stats_timer(State = #q{stats_timer = StatsTimer, - q = #amqqueue{pid = QPid}}) -> - State#q{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, QPid, emit_stats)}. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1120,10 +1117,10 @@ handle_info(maybe_expire, State) -> handle_info(drop_expired, State) -> noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); -handle_info(emit_stats, State = #q{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), - State1 = State#q{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, + State1 = rabbit_event:reset_stats_timer(State, #q.stats_timer), assert_invariant(State1), {noreply, State1, hibernate}; @@ -1167,18 +1164,17 @@ handle_info(Info, State) -> handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> {hibernate, State}; handle_pre_hibernate(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - stats_timer = StatsTimer}) -> + backing_queue_state = BQS}) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( - StatsTimer, + State, #q.stats_timer, fun () -> emit_stats(State, [{idle_since, now()}]) end), - State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, + #q.stats_timer), {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3c61447af2..883e570ad6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,7 +34,7 @@ -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_ack_q, + unacked_message_q, uncommitted_message_q, uncommitted_acks, user, virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -173,7 +173,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, Limiter]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), - StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -185,7 +184,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, next_tag = 1, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new(), + uncommitted_acks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -194,7 +193,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, blocking = sets:new(), queue_consumers = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), @@ -202,10 +200,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, - rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), - rabbit_event:if_enabled(StatsTimer, - fun() -> emit_stats(State) end), - {ok, State, hibernate, + State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_call(Msg, _From, _State) -> @@ -319,10 +318,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) -> +handle_info(emit_stats, State) -> emit_stats(State), noreply([ensure_stats_timer], - State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}); + rabbit_event:reset_stats_timer(State, #ch.stats_timer)); handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), @@ -335,12 +334,12 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State) -> ok = clear_permission_cache(), rabbit_event:if_enabled( - StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), - StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), - {hibernate, State#ch{stats_timer = StatsTimer1}}. + State, #ch.stats_timer, + fun () -> emit_stats(State, [{idle_since, now()}]) end), + {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. terminate(Reason, State) -> {Res, _State1} = notify_queues(State), @@ -385,9 +384,8 @@ next_state(Mask, State) -> State2 = ?MASKED_CALL(send_confirms, Mask, State1), State2. -ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) -> - State#ch{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, self(), emit_stats)}. +ensure_stats_timer(State) -> + rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats). return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -669,15 +667,14 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, - _, State = #ch{unacked_message_q = UAMQ, - tx_status = TxStatus}) -> + _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of none -> ack(Acked, State1); - in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked), - State1#ch{uncommitted_ack_q = NewTAQ} + in_progress -> State1#ch{uncommitted_acks = + Acked ++ State1#ch.uncommitted_acks} end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -839,6 +836,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, + UAMQL = queue:to_list(UAMQ), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_misc:with_exit_handler( @@ -846,8 +844,8 @@ handle_method(#'basic.recover_async'{requeue = true}, rabbit_amqqueue:requeue( QPid, MsgIds, self()) end) - end, ok, UAMQ), - ok = notify_limiter(Limiter, UAMQ), + end, ok, UAMQL), + ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; @@ -1071,8 +1069,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_ack_q = TAQ}) -> - State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2, + uncommitted_acks = TAL}) -> + State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ))), {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; @@ -1080,10 +1078,11 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_ack_q = TAQ}) -> - {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = - queue:join(TAQ, UAMQ)})}; +handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, + uncommitted_acks = TAL}) -> + TAQ = queue:from_list(lists:reverse(TAL)), + {reply, #'tx.rollback_ok'{}, + new_tx(State#ch{unacked_message_q = queue:join(TAQ, UAMQ)})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> rabbit_misc:protocol_error( @@ -1162,11 +1161,11 @@ demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> false -> State end. -queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer, - queue_consumers = QCons, +queue_monitor_needed(QPid, #ch{queue_consumers = QCons, blocking = Blocking, - unconfirmed_qm = UQM}) -> - StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine, + unconfirmed_qm = UQM} = State) -> + StatsEnabled = rabbit_event:stats_level( + State, #ch.stats_timer) =:= fine, ConsumerMonitored = dict:is_key(QPid, QCons), QueueBlocked = sets:is_element(QPid, Blocking), ConfirmMonitored = gb_trees:is_defined(QPid, UQM), @@ -1282,18 +1281,18 @@ ack_record(DeliveryTag, ConsumerTag, {DeliveryTag, ConsumerTag, {QPid, MsgId}}. collect_acks(Q, 0, true) -> - {Q, queue:new()}; + {queue:to_list(Q), queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> - collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). + collect_acks([], queue:new(), Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case queue:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> - {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; + {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)}; Multiple -> - collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, + collect_acks([UnackedMsg | ToAcc], PrefixAcc, QTail, DeliveryTag, Multiple); true -> collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), @@ -1314,7 +1313,7 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_ack_q = queue:new()}. + uncommitted_acks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1322,12 +1321,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers}) -> {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), State#ch{state = closing}}. -fold_per_queue(F, Acc0, UAQ) -> - T = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) - end, gb_trees:empty(), UAQ), - rabbit_misc:gb_trees_fold(F, Acc0, T). +fold_per_queue(_F, Acc, []) -> + Acc; +fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case + F(QPid, [MsgId], Acc); +fold_per_queue(F, Acc, UAL) -> + T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons(QPid, MsgId, T) + end, gb_trees:empty(), UAL), + rabbit_misc:gb_trees_fold(F, Acc, T). enable_limiter(State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> @@ -1349,9 +1351,9 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> case rabbit_limiter:is_enabled(Limiter) of false -> ok; - true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc; - ({_, _, _}, Acc) -> Acc + 1 - end, 0, Acked) of + true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, Acked) of 0 -> ok; Count -> rabbit_limiter:ack(Limiter, Count) end @@ -1493,8 +1495,8 @@ i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) -> queue:len(TMQ); -i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) -> - queue:len(TAQ); +i(acks_uncommitted, #ch{uncommitted_acks = TAL}) -> + length(TAL); i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> @@ -1507,8 +1509,8 @@ maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_redeliver_stats(_, _, State) -> State. -maybe_incr_stats(QXIncs, Measure, State = #ch{stats_timer = StatsTimer}) -> - case rabbit_event:stats_level(StatsTimer) of +maybe_incr_stats(QXIncs, Measure, State) -> + case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> lists:foldl(fun ({QX, Inc}, State0) -> incr_stats(QX, Inc, Measure, State0) end, State, QXIncs); @@ -1540,9 +1542,9 @@ update_measures(Type, QX, Inc, Measure) -> emit_stats(State) -> emit_stats(State, []). -emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> +emit_stats(State, Extra) -> CoarseStats = infos(?STATISTICS_KEYS, State), - case rabbit_event:stats_level(StatsTimer) of + case rabbit_event:stats_level(State, #ch.stats_timer) of coarse -> rabbit_event:notify(channel_stats, Extra ++ CoarseStats); fine -> diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index bb765566cc..5ae40c784a 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -19,9 +19,9 @@ -include("rabbit.hrl"). -export([start_link/0]). --export([init_stats_timer/0, ensure_stats_timer/3, stop_stats_timer/1]). --export([reset_stats_timer/1]). --export([stats_level/1, if_enabled/2]). +-export([init_stats_timer/2, ensure_stats_timer/3, stop_stats_timer/2]). +-export([reset_stats_timer/2]). +-export([stats_level/2, if_enabled/3]). -export([notify/2, notify_if/3]). %%---------------------------------------------------------------------------- @@ -39,29 +39,23 @@ -type(event_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). --type(event() :: #event { - type :: event_type(), - props :: event_props(), - timestamp :: event_timestamp() - }). +-type(event() :: #event { type :: event_type(), + props :: event_props(), + timestamp :: event_timestamp() }). -type(level() :: 'none' | 'coarse' | 'fine'). --opaque(state() :: #state { - level :: level(), - interval :: integer(), - timer :: atom() - }). - -type(timer_fun() :: fun (() -> 'ok')). +-type(container() :: tuple()). +-type(pos() :: non_neg_integer()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(init_stats_timer/0 :: () -> state()). --spec(ensure_stats_timer/3 :: (state(), pid(), term()) -> state()). --spec(stop_stats_timer/1 :: (state()) -> state()). --spec(reset_stats_timer/1 :: (state()) -> state()). --spec(stats_level/1 :: (state()) -> level()). --spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok'). +-spec(init_stats_timer/2 :: (container(), pos()) -> container()). +-spec(ensure_stats_timer/3 :: (container(), pos(), term()) -> container()). +-spec(stop_stats_timer/2 :: (container(), pos()) -> container()). +-spec(reset_stats_timer/2 :: (container(), pos()) -> container()). +-spec(stats_level/2 :: (container(), pos()) -> level()). +-spec(if_enabled/3 :: (container(), pos(), timer_fun()) -> 'ok'). -spec(notify/2 :: (event_type(), event_props()) -> 'ok'). -spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok'). @@ -75,58 +69,69 @@ start_link() -> %% The idea is, for each stat-emitting object: %% %% On startup: -%% Timer = init_stats_timer() +%% init_stats_timer(State) %% notify(created event) %% if_enabled(internal_emit_stats) - so we immediately send something %% %% On wakeup: -%% ensure_stats_timer(Timer, Pid, emit_stats) +%% ensure_stats_timer(State, emit_stats) %% (Note we can't emit stats immediately, the timer may have fired 1ms ago.) %% %% emit_stats: %% if_enabled(internal_emit_stats) -%% reset_stats_timer(Timer) - just bookkeeping +%% reset_stats_timer(State) - just bookkeeping %% %% Pre-hibernation: %% if_enabled(internal_emit_stats) -%% stop_stats_timer(Timer) +%% stop_stats_timer(State) %% %% internal_emit_stats: %% notify(stats) -init_stats_timer() -> +init_stats_timer(C, P) -> {ok, StatsLevel} = application:get_env(rabbit, collect_statistics), {ok, Interval} = application:get_env(rabbit, collect_statistics_interval), - #state{level = StatsLevel, interval = Interval, timer = undefined}. - -ensure_stats_timer(State = #state{level = none}, _Pid, _Msg) -> - State; -ensure_stats_timer(State = #state{interval = Interval, - timer = undefined}, Pid, Msg) -> - TRef = erlang:send_after(Interval, Pid, Msg), - State#state{timer = TRef}; -ensure_stats_timer(State, _Pid, _Msg) -> - State. - -stop_stats_timer(State = #state{level = none}) -> - State; -stop_stats_timer(State = #state{timer = undefined}) -> - State; -stop_stats_timer(State = #state{timer = TRef}) -> - erlang:cancel_timer(TRef), - State#state{timer = undefined}. - -reset_stats_timer(State) -> - State#state{timer = undefined}. - -stats_level(#state{level = Level}) -> + setelement(P, C, #state{level = StatsLevel, interval = Interval, + timer = undefined}). + +ensure_stats_timer(C, P, Msg) -> + case element(P, C) of + #state{level = Level, interval = Interval, timer = undefined} = State + when Level =/= none -> + TRef = erlang:send_after(Interval, self(), Msg), + setelement(P, C, State#state{timer = TRef}); + #state{} -> + C + end. + +stop_stats_timer(C, P) -> + case element(P, C) of + #state{level = Level, timer = TRef} = State + when Level =/= none andalso TRef =/= undefined -> + erlang:cancel_timer(TRef), + setelement(P, C, State#state{timer = undefined}); + #state{} -> + C + end. + +reset_stats_timer(C, P) -> + case element(P, C) of + #state{timer = TRef} = State + when TRef =/= undefined -> + setelement(P, C, State#state{timer = undefined}); + #state{} -> + C + end. + +stats_level(C, P) -> + #state{level = Level} = element(P, C), Level. -if_enabled(#state{level = none}, _Fun) -> - ok; -if_enabled(_State, Fun) -> - Fun(), - ok. +if_enabled(C, P, Fun) -> + case element(P, C) of + #state{level = none} -> ok; + #state{} -> Fun(), ok + end. notify_if(true, Type, Props) -> notify(Type, Props); notify_if(false, _Type, _Props) -> ok. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 5fc6341f50..328fe639f7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -95,7 +95,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, (case MNodes of all -> rabbit_mnesia:all_clustered_nodes(); undefined -> []; - _ -> [list_to_atom(binary_to_list(Node)) || Node <- MNodes] + _ -> MNodes end) -- [node()], [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b4871cefc1..b359f7d452 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -199,34 +199,32 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, ClientSock = socket_op(Sock, SockTransform), erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), + State = #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + protocol = none, + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, + client_properties = none, + capabilities = []}, + callback = uninitialized_callback, + recv_len = 0, + pending_recv = false, + connection_state = pre_init, + queue_collector = Collector, + heartbeater = none, + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = StartHeartbeatFun, + buf = [], + buf_len = 0, + auth_mechanism = none, + auth_state = none}, try - recvloop(Deb, switch_callback( - #v1{parent = Parent, - sock = ClientSock, - connection = #connection{ - protocol = none, - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, - client_properties = none, - capabilities = []}, - callback = uninitialized_callback, - recv_len = 0, - pending_recv = false, - connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = - rabbit_event:init_stats_timer(), - channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun, - buf = [], - buf_len = 0, - auth_mechanism = none, - auth_state = none - }, - handshake, 8)) + recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; @@ -605,10 +603,8 @@ refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), throw(Exception). -ensure_stats_timer(State = #v1{stats_timer = StatsTimer, - connection_state = running}) -> - State#v1{stats_timer = rabbit_event:ensure_stats_timer( - StatsTimer, self(), emit_stats)}; +ensure_stats_timer(State = #v1{connection_state = running}) -> + rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats); ensure_stats_timer(State) -> State. @@ -695,8 +691,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, connection = Connection = #connection{ user = User, protocol = Protocol}, - sock = Sock, - stats_timer = StatsTimer}) -> + sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), @@ -707,7 +702,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(StatsTimer, + rabbit_event:if_enabled(State1, #v1.stats_timer, fun() -> emit_stats(State1) end), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> @@ -937,6 +932,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, State1#v1.sock, 0, CloseMethod, Protocol), State1. -emit_stats(State = #v1{stats_timer = StatsTimer}) -> +emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), - State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. + rabbit_event:reset_stats_timer(State, #v1.stats_timer). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8151d37a73..03004e102c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -493,9 +493,31 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, _ChPid, State) -> - {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), - a(reduce_memory_use(State1)). +publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + ram_msg_count = RamMsgCount, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps), + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + State2 = case ?QUEUE:is_empty(Q3) of + false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; + true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } + end, + PCount1 = PCount + one_if(IsPersistent1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + a(reduce_memory_use(State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1, + unconfirmed = UC1 })). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, @@ -1126,34 +1148,6 @@ sum_msg_ids_by_store_to_len(LensByStore, MsgIdsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, MsgOnDisk, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - ram_msg_count = RamMsgCount, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, - {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - PCount1 = PCount + one_if(IsPersistent1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1, - unconfirmed = UC1 }}. - maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> MsgStatus; |
