diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 109 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
3 files changed, 63 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 76b5defb68..68032d7641 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -229,24 +229,24 @@ bq_init(BQ, Q, Recover) -> end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> - lists:foldl(fun({Arg, Fun}, State1) -> - case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, State1); - undefined -> State1 - end - end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, - {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, - fun init_dlx_routing_key/2}]). + lists:foldl( + fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, + [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, fun init_dlx/2}, + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). -init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{ - virtual_host = VHostPath}}}) -> - State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}. +init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> + State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. @@ -863,6 +863,8 @@ cleanup_after_confirm(State = #q{blocked_op = Op, noreply(State) end. +already_been_here(_Delivery, #q{dlx = undefined}) -> + false; already_been_here(#delivery{message = #basic_message{content = Content}}, State) -> #content{properties = #'P_basic'{headers = Headers}} = @@ -951,15 +953,12 @@ infos(Items, State) -> || Item <- (Items1 -- [synchronised_slave_pids])]. slaves_status(#q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = - rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> [{slave_pids, ''}, {synchronised_slave_pids, ''}]; - _ -> + {ok, #amqqueue{slave_pids = SPids}} -> {Results, _Bad} = - delegate:invoke( - SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1), {SPids1, SSPids} = lists:foldl( fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> @@ -1003,11 +1002,9 @@ i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - {ok, #amqqueue{mirror_nodes = MNodes, - slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), - case MNodes of - undefined -> []; - _ -> SPids + case rabbit_amqqueue:lookup(Name) of + {ok, #amqqueue{mirror_nodes = undefined}} -> []; + {ok, #amqqueue{slave_pids = SPids}} -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1234,12 +1231,9 @@ handle_call({delete, IfUnused, IfEmpty}, From, IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if - IfEmpty and not(IsEmpty) -> - reply({error, not_empty}, State); - IfUnused and not(IsUnused) -> - reply({error, in_use}, State); - true -> - dead_letter_deleted_queue(From, State) + IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); + IfUnused and not(IsUnused) -> reply({error, in_use}, State); + true -> dead_letter_deleted_queue(From, State) end; handle_call(purge, _From, State = #q{backing_queue = BQ, @@ -1250,10 +1244,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call(purge, From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - BQS1 = BQ:dropwhile( - fun (_) -> true end, - mk_dead_letter_fun(queue_purged, State), - BQS), + BQS1 = BQ:dropwhile(fun (_) -> true end, + mk_dead_letter_fun(queue_purged, State), + BQS), case BQ:len(BQS) of 0 -> reply({ok, 0}, State#q{backing_queue_state = BQS1}); _ -> noreply( @@ -1271,37 +1264,23 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, - Flow}, State = #q{dlx = DLX}) -> + msg_seq_no = MsgSeqNo}, Flow}, + State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - ShouldDeliver = - case DLX of - undefined -> - true; - _ -> - case already_been_here(Delivery, State) of - false -> true; - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, - [MsgSeqNo]), - false - end - end, - case ShouldDeliver of - false -> noreply(State); - true -> case Flow of - flow -> - Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, - Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> - ok - end, - noreply(deliver_or_enqueue(Delivery, State)) + case Flow of + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> ok + end, + case already_been_here(Delivery, State) of + false -> noreply(deliver_or_enqueue(Delivery, State)); + Qs -> log_cycle_once(Qs), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + noreply(State) end; handle_cast({ack, AckTags, ChPid}, State) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 54a7add2e6..323cf0ce9e 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -61,23 +61,26 @@ notify_cluster() -> %%-------------------------------------------------------------------- init([]) -> - {ok, no_state}. + {ok, ordsets:new()}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, State) -> - rabbit_log:info("rabbit on ~p up~n", [Node]), - erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), - {noreply, State}; +handle_cast({rabbit_running_on, Node}, Nodes) -> + case ordsets:is_element(Node, Nodes) of + true -> {noreply, Nodes}; + false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + erlang:monitor(process, {rabbit, Node}), + ok = handle_live_rabbit(Node), + {noreply, ordsets:add_element(Node, Nodes)} + end; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> - rabbit_log:info("node ~p lost 'rabbit'~n", [Node]), +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> + rabbit_log:info("rabbit on node ~p down~n", [Node]), ok = handle_dead_rabbit(Node), - {noreply, State}; + {noreply, ordsets:del_element(Node, Nodes)}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a4a2d655cb..95b47d8343 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -593,8 +593,8 @@ dropwhile(Pred, MsgFun, State) -> {true, _} -> {{_, _, AckTag, _}, State2} = internal_fetch(true, MsgStatus, State1), - State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2), - dropwhile(Pred, MsgFun, State3); + dropwhile(Pred, MsgFun, MsgFun(read_msg_callback(MsgStatus), + AckTag, State2)); {false, _} -> a(in_r(MsgStatus, State1)) end @@ -648,11 +648,11 @@ ack(AckTags, undefined, State) -> ack_out_counter = AckOutCount + length(AckTags) })}; ack(AckTags, MsgFun, State = #vqstate{pending_ack = PA}) -> - State2 = lists:foldl(fun(SeqId, State1) -> - AckEntry = gb_trees:get(SeqId, PA), - MsgFun(read_msg_callback(AckEntry), SeqId, State1) - end, State, AckTags), - {[], State2}. + {[], lists:foldl( + fun(SeqId, State1) -> + AckEntry = gb_trees:get(SeqId, PA), + MsgFun(read_msg_callback(AckEntry), SeqId, State1) + end, State, AckTags)}. requeue(AckTags, #vqstate { delta = Delta, q3 = Q3, |
