diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-28 17:11:58 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-28 17:11:58 +0000 |
| commit | fd8bffd2df83e0d58ba36a323466a417d38f7c4d (patch) | |
| tree | c2684f87506ac6281e63d6449d6a444f2d44e334 /src | |
| parent | 9566c5bfbb46338ff414f9bd8c56925ff8711b87 (diff) | |
| download | rabbitmq-server-git-fd8bffd2df83e0d58ba36a323466a417d38f7c4d.tar.gz | |
Large amounts of debitrotting due to changes to confirms api and such like. Sadly mirrored confirms aren't working again yet... not really sure why
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 115 |
2 files changed, 92 insertions, 27 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 11831a2998..e2f9b0208d 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -232,8 +232,8 @@ ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> BQ:needs_idle_timeout(BQS). -idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> - BQ:idle_timeout(BQS). +idle_timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:idle_timeout(BQS) }. handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4f9d2066be..396e3c3592 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -67,7 +67,8 @@ -export([start_link/1, set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, handle_pre_hibernate/1]). + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -82,6 +83,7 @@ master_node, backing_queue, backing_queue_state, + sync_timer_ref, rate_timer_ref, sender_queues, %% :: Pid -> MsgQ @@ -91,6 +93,7 @@ guid_to_channel %% for confirms }). +-define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). start_link(Q) -> @@ -137,6 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) -> backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, + sync_timer_ref = undefined, sender_queues = dict:new(), guid_ack = dict:new(), @@ -212,7 +216,14 @@ handle_cast(update_ram_duration, rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), noreply(State #state { rate_timer_ref = just_measured, - backing_queue_state = BQS2 }). + backing_queue_state = BQS2 }); + +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout( + State #state { sync_timer_ref = undefined })). + +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -245,12 +256,30 @@ code_change(_OldVsn, State, _Extra) -> handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% mainly copied from amqqueue_process - BQS1 = BQ:handle_pre_hibernate(BQS), - %% no activity for a while == 0 egress and ingress rates + {RamDuration, BQS1} = BQ:ram_duration(BQS), DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), infinity), + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS2 })}. + BQS3 = BQ:handle_pre_hibernate(BQS2), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + _ -> 0 + end. %% --------------------------------------------------------------------------- %% GM @@ -285,12 +314,9 @@ handle_msg([SPid], _From, Msg) -> maybe_run_queue_via_backing_queue( Mod, Fun, State = #state { backing_queue = BQ, - backing_queue_state = BQS, - guid_to_channel = GTC }) -> + backing_queue_state = BQS }) -> {Guids, BQS1} = BQ:invoke(Mod, Fun, BQS), - GTC1 = lists:foldl(fun maybe_confirm_message/2, GTC, Guids), - State #state { backing_queue_state = BQS1, - guid_to_channel = GTC1 }. + confirm_messages(Guids, State #state { backing_queue_state = BQS1 }). record_confirm_or_confirm(#delivery { msg_seq_no = undefined }, _Q, GTC) -> GTC; @@ -305,13 +331,27 @@ record_confirm_or_confirm(#delivery { sender = ChPid, msg_seq_no = MsgSeqNo }, ok = rabbit_channel:confirm(ChPid, MsgSeqNo), GTC. -maybe_confirm_message(Guid, GTC) -> - case dict:find(Guid, GTC) of - {ok, {ChPid, MsgSeqNo}} when MsgSeqNo =/= undefined -> - ok = rabbit_channel:confirm(ChPid, MsgSeqNo), - dict:erase(Guid, GTC); - error -> - GTC +confirm_messages(Guids, State = #state { guid_to_channel = GTC }) -> + {CMs, GTC1} = + lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {gb_trees:empty(), GTC}, Guids), + gb_trees:map(fun(ChPid, MsgSeqNos) -> + rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), + State #state { guid_to_channel = GTC1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) end. handle_process_result({ok, State}) -> noreply(State); @@ -348,15 +388,39 @@ promote_me(From, #state { q = Q, {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> - {noreply, next_state(State), hibernate}. + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. reply(Reply, State) -> - {reply, Reply, next_state(State), hibernate}. + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. next_state(State) -> - ensure_rate_timer(State). + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + ensure_rate_timer(State), + case BQ:needs_idle_timeout(BQS) of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. %% copied+pasted from amqqueue_process +backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> + maybe_run_queue_via_backing_queue( + BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State #state { sync_timer_ref = TRef }; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { sync_timer_ref = undefined }. + ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after( ?RAM_DURATION_UPDATE_INTERVAL, @@ -438,10 +502,11 @@ process_instruction( {true, AckRequired} -> {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - {GA1, GTC3} = case AckRequired of - true -> {dict:store(Guid, AckTag, GA), GTC1}; - false -> {GA, maybe_confirm_message(Guid, GTC1)} - end, + {GA1, GTC3} = + case AckRequired of + true -> {dict:store(Guid, AckTag, GA), GTC1}; + false -> {GA, confirm_messages([Guid], GTC1)} + end, State1 #state { backing_queue_state = BQS1, guid_ack = GA1, guid_to_channel = GTC3 } |
