diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2011-03-05 11:03:16 -0800 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2011-03-05 11:03:16 -0800 |
| commit | 807081deffb9b03cab8723ba0aa354dedbc27e8f (patch) | |
| tree | 50c689573c9d0811132f625a77c34218949fe5e4 /src | |
| parent | 6b60d505ce766f6ef1daa5c3f8743463dc2f1ef5 (diff) | |
| parent | 3fd05897f3a1fa4a98d5d046b2dcc297667e65d9 (diff) | |
| download | rabbitmq-server-git-807081deffb9b03cab8723ba0aa354dedbc27e8f.tar.gz | |
Merge heads.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_alarm.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 | ||||
| -rw-r--r-- | src/vm_memory_monitor.erl | 4 |
5 files changed, 137 insertions, 92 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 37e40981a6..d38ecb91fe 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -18,12 +18,14 @@ -behaviour(gen_event). --export([start/0, stop/0, register/2]). +-export([start/0, stop/0, register/2, on_node_up/1, on_node_down/1]). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). --record(alarms, {alertees, vm_memory_high_watermark = false}). +-export([remote_conserve_memory/2]). %% Internal use only + +-record(alarms, {alertees, alarmed_nodes}). %%---------------------------------------------------------------------------- @@ -33,6 +35,8 @@ -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_down/1 :: (node()) -> 'ok'). -endif. @@ -56,39 +60,57 @@ register(Pid, HighMemMFA) -> {register, Pid, HighMemMFA}, infinity). +on_node_up(Node) -> gen_event:notify(alarm_handler, {node_up, Node}). + +on_node_down(Node) -> gen_event:notify(alarm_handler, {node_down, Node}). + +%% Can't use alarm_handler:{set,clear}_alarm because that doesn't +%% permit notifying a remote node. +remote_conserve_memory(Pid, true) -> + gen_event:notify({alarm_handler, node(Pid)}, + {set_alarm, {{vm_memory_high_watermark, node()}, []}}); +remote_conserve_memory(Pid, false) -> + gen_event:notify({alarm_handler, node(Pid)}, + {clear_alarm, {vm_memory_high_watermark, node()}}). + %%---------------------------------------------------------------------------- init([]) -> - {ok, #alarms{alertees = dict:new()}}. + {ok, #alarms{alertees = dict:new(), + alarmed_nodes = sets:new()}}. -handle_call({register, Pid, {M, F, A} = HighMemMFA}, - State = #alarms{alertees = Alertess}) -> - _MRef = erlang:monitor(process, Pid), - ok = case State#alarms.vm_memory_high_watermark of - true -> apply(M, F, A ++ [Pid, true]); - false -> ok - end, - NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, State#alarms.vm_memory_high_watermark, - State#alarms{alertees = NewAlertees}}; +handle_call({register, Pid, HighMemMFA}, State) -> + {ok, 0 < sets:size(State#alarms.alarmed_nodes), + internal_register(Pid, HighMemMFA, State)}; handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event({set_alarm, {vm_memory_high_watermark, []}}, State) -> - ok = alert(true, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = true}}; +handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> + {ok, maybe_alert(fun sets:add_element/2, Node, State)}; -handle_event({clear_alarm, vm_memory_high_watermark}, State) -> - ok = alert(false, State#alarms.alertees), - {ok, State#alarms{vm_memory_high_watermark = false}}; +handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> + {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + +handle_event({node_up, Node}, State) -> + %% Must do this via notify and not call to avoid possible deadlock. + ok = gen_event:notify( + {alarm_handler, Node}, + {register, self(), {?MODULE, remote_conserve_memory, []}}), + {ok, State}; + +handle_event({node_down, Node}, State) -> + {ok, maybe_alert(fun sets:del_element/2, Node, State)}; + +handle_event({register, Pid, HighMemMFA}, State) -> + {ok, internal_register(Pid, HighMemMFA, State)}; handle_event(_Event, State) -> {ok, State}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #alarms{alertees = Alertess}) -> - {ok, State#alarms{alertees = dict:erase(Pid, Alertess)}}; + State = #alarms{alertees = Alertees}) -> + {ok, State#alarms{alertees = dict:erase(Pid, Alertees)}}; handle_info(_Info, State) -> {ok, State}. @@ -100,10 +122,45 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%---------------------------------------------------------------------------- -alert(_Alert, undefined) -> - ok; -alert(Alert, Alertees) -> - dict:fold(fun (Pid, {M, F, A}, Acc) -> - ok = erlang:apply(M, F, A ++ [Pid, Alert]), - Acc + +maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, + alertees = Alertees}) -> + AN1 = SetFun(Node, AN), + BeforeSz = sets:size(AN), + AfterSz = sets:size(AN1), + %% If we have changed our alarm state, inform the remotes. + IsLocal = Node =:= node(), + if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); + IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); + true -> ok + end, + %% If the overall alarm state has changed, inform the locals. + case {BeforeSz, AfterSz} of + {0, 1} -> ok = alert_local(true, Alertees); + {1, 0} -> ok = alert_local(false, Alertees); + {_, _} -> ok + end, + State#alarms{alarmed_nodes = AN1}. + +alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). + +alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). + +alert(Alert, Alertees, NodeComparator) -> + Node = node(), + dict:fold(fun (Pid, {M, F, A}, ok) -> + case NodeComparator(Node, node(Pid)) of + true -> apply(M, F, A ++ [Pid, Alert]); + false -> ok + end end, ok, Alertees). + +internal_register(Pid, {M, F, A} = HighMemMFA, + State = #alarms{alertees = Alertees}) -> + _MRef = erlang:monitor(process, Pid), + case sets:is_element(node(), State#alarms.alarmed_nodes) of + true -> ok = apply(M, F, A ++ [Pid, true]); + false -> ok + end, + NewAlertees = dict:store(Pid, HighMemMFA, Alertees), + State#alarms{alertees = NewAlertees}. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7719dfe739..24de941595 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -283,17 +283,16 @@ lookup_ch(ChPid) -> ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of - undefined -> - MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = sets:new(), - is_limit_active = false, - txn = none, - unsent_message_count = 0}, - put(Key, C), - C; + undefined -> MonitorRef = erlang:monitor(process, ChPid), + C = #cr{consumer_count = 0, + ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = sets:new(), + is_limit_active = false, + txn = none, + unsent_message_count = 0}, + put(Key, C), + C; C = #cr{} -> C end. @@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -all_ch_record() -> - [C || {{ch, _}, C} <- get()]. +all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> - BlockedOld = is_ch_blocked(OldCR), - BlockedNew = is_ch_blocked(NewCR), - if BlockedOld andalso not(BlockedNew) -> unblock; - BlockedNew andalso not(BlockedOld) -> block; - true -> ok + case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of + {true, false} -> unblock; + {false, true} -> block; + {_, _} -> ok end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, @@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), BlockedConsumers}; - block -> - {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + block -> {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, State2 = State1#q{ active_consumers = NewActiveConsumers, @@ -396,8 +392,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end. -deliver_from_queue_pred(IsEmpty, _State) -> - not IsEmpty. +deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = @@ -405,17 +400,16 @@ deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = - lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; - _ -> - {CMs, GTC0} - end - end, {gb_trees:empty(), GTC}, Guids), + {CMs, GTC1} = lists:foldl( + fun(Guid, {CMs, GTC0}) -> + case dict:find(Guid, GTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(Guid, GTC0)}; + _ -> + {CMs, GTC0} + end + end, {gb_trees:empty(), GTC}, Guids), gb_trees:map(fun(ChPid, MsgSeqNos) -> rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), @@ -480,17 +474,14 @@ attempt_delivery(#delivery{txn = none, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), {Delivered, NeedsConfirming, State1}; -attempt_delivery(#delivery{txn = Txn, +attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - {NeedsConfirming, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - {true, - NeedsConfirming, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -661,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> - Now > Expiry - end, BQS), + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -814,8 +804,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, - _From, State) -> +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -906,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, case is_ch_blocked(C) of true -> State1#q{ blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; + add_consumer(ChPid, Consumer, + State1#q.blocked_consumers)}; false -> run_message_queue( State1#q{ active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) + add_consumer(ChPid, Consumer, + State1#q.active_consumers)}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index ebae48d492..1f30a2fc35 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -69,6 +69,7 @@ handle_call(_Request, _From, State) -> handle_cast({rabbit_running_on, Node}, State) -> rabbit_log:info("node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), + ok = rabbit_alarm:on_node_up(Node), {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. @@ -92,10 +93,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -%% TODO: This may turn out to be a performance hog when there are -%% lots of nodes. We really only need to execute this code on -%% *one* node, rather than all of them. +%% TODO: This may turn out to be a performance hog when there are lots +%% of nodes. We really only need to execute some of these statements +%% on *one* node, rather than all of them. handle_dead_rabbit(Node) -> ok = rabbit_networking:on_node_down(Node), - ok = rabbit_amqqueue:on_node_down(Node). - + ok = rabbit_amqqueue:on_node_down(Node), + ok = rabbit_alarm:on_node_down(Node). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 07f31a3a53..591e5a6611 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -544,7 +544,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), - State1. + a(State1). dropwhile1(Pred, State) -> internal_queue_out( diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl index 44e1e4b5ae..dcc6aff5c8 100644 --- a/src/vm_memory_monitor.erl +++ b/src/vm_memory_monitor.erl @@ -175,10 +175,10 @@ internal_update(State = #state { memory_limit = MemLimit, case {Alarmed, NewAlarmed} of {false, true} -> emit_update_info(set, MemUsed, MemLimit), - alarm_handler:set_alarm({vm_memory_high_watermark, []}); + alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); {true, false} -> emit_update_info(clear, MemUsed, MemLimit), - alarm_handler:clear_alarm(vm_memory_high_watermark); + alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); _ -> ok end, |
