diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-16 12:57:28 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-16 12:57:28 +0100 |
| commit | 2cc1fa0b3301079ed034ffc1662d3c260c85efa4 (patch) | |
| tree | 188599efc615d2e631beb448a20a75194e511da5 /src | |
| parent | caef9d20782b30a44efa3d2707446018f4c32e7b (diff) | |
| parent | 5a390d37a3bd2412d16306a07657f45c1e10e0d0 (diff) | |
| download | rabbitmq-server-git-2cc1fa0b3301079ed034ffc1662d3c260c85efa4.tar.gz | |
Merge branch 'master' into rabbitmq-management-649-single-active-consumer
Conflicts:
src/rabbit_fifo.erl
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 47 |
6 files changed, 66 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5eb72bfffd..af42d68359 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -15,8 +15,8 @@ %% -module(rabbit_amqqueue_process). --include("rabbit.hrl"). --include("rabbit_framing.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). -behaviour(gen_server2). @@ -606,13 +606,6 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> - ok; -send_mandatory(#delivery{mandatory = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). - discard(#delivery{confirm = Confirm, sender = SenderPid, flow = Flow, @@ -676,7 +669,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, State = #q{overflow = Overflow, backing_queue = BQ, backing_queue_state = BQS}) -> - send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 805d9f538f..634789adab 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -34,8 +34,6 @@ %% * Keeping track of consumers %% * Keeping track of unacknowledged deliveries to consumers %% * Keeping track of publisher confirms -%% * Keeping track of mandatory message routing confirmations -%% and returns %% * Transaction management %% * Authorisation (enforcing permissions) %% * Publishing trace events if tracing is enabled @@ -143,9 +141,6 @@ %% a list of tags for published messages that were %% rejected but are yet to be sent to the client rejected, - %% a dtree used to track oustanding notifications - %% for messages published as mandatory - mandatory, %% same as capabilities in the reader capabilities, %% tracing exchange resource if tracing is enabled, @@ -469,7 +464,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, unconfirmed = dtree:empty(), rejected = [], confirmed = [], - mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), consumer_prefetch = Prefetch, @@ -502,7 +496,6 @@ prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; {reject_publish, _MsgSeqNos, _QPid} -> 5; - {mandatory_received, _MsgSeqNo, _QPid} -> 5; _ -> 0 end. @@ -637,10 +630,6 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); -handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) -> - %% NB: don't call noreply/1 since we don't want to send confirms. - noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)}); - handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> %% It does not matter which queue rejected the message, %% if any queue rejected it - it should not be confirmed. @@ -1707,17 +1696,13 @@ track_delivering_queue(NoAck, QPid, QName, false -> sets:add_element(QRef, DQ) end}. -handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, - mandatory = Mand}) +handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) when ?IS_CLASSIC(QPid) -> - {MMsgs, Mand1} = dtree:take(QPid, Mand), - [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs], - State1 = State#ch{mandatory = Mand1}, case rabbit_misc:is_abnormal_exit(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), - record_rejects(MXs, State1#ch{unconfirmed = UC1}); + record_rejects(MXs, State#ch{unconfirmed = UC1}); false -> {MXs, UC1} = dtree:take(QPid, UC), - record_confirms(MXs, State1#ch{unconfirmed = UC1}) + record_confirms(MXs, State#ch{unconfirmed = UC1}) end; handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> @@ -1972,7 +1957,7 @@ foreach_per_queue(F, UAL, Acc) -> consumer_queue_refs(Consumers) -> lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} - <- maps:to_list(Consumers)]). + <- maps:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for @@ -2038,11 +2023,11 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. - State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo, - Message, State1), - State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, - XName, State2), - case rabbit_event:stats_level(State3, #ch.stats_timer) of + ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs, + Message, State1), + State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + XName, State1), + case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || @@ -2051,16 +2036,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ _ -> ok end, - State3#ch{queue_states = QueueStates}. + State2#ch{queue_states = QueueStates}. -process_routing_mandatory(false, _, _, _, State) -> - State; -process_routing_mandatory(true, [], _, Msg, State) -> +process_routing_mandatory(true, [], Msg, State) -> ok = basic_return(Msg, State, no_route), - State; -process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg, - State#ch.mandatory)}. + ok; +process_routing_mandatory(_, _, _, _) -> + ok. process_routing_confirm(false, _, _, _, State) -> State; diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 94a4e7d709..8f34c01210 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -613,7 +613,9 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); -state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> +state_enter(eol, #state{enqueuers = Enqs, + consumers = Custs0, + waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), AllConsumers = maps:merge(Custs, WaitingConsumers1), diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 61f27a4a83..bf0a5a7ed0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -542,13 +542,6 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -send_mandatory(#delivery{mandatory = false}) -> - ok; -send_mandatory(#delivery{mandatory = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo}) -> - gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). - send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, @@ -707,13 +700,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% We reset mandatory to false here because we will have sent the -%% mandatory_received already as soon as we got the message. We also -%% need to send an ack for these messages since the channel is waiting +%% We need to send an ack for these messages since the channel is waiting %% for one for the via-GM case and we will not now receive one. promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> maybe_flow_ack(Sender, Flow), - Delivery#delivery{mandatory = false}. + Delivery. noreply(State) -> {NewState, Timeout} = next_state(State), @@ -832,7 +823,6 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> - send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case maps:find(MsgId, MS) of diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 54f4b8a87d..6131e2f294 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -35,8 +35,7 @@ connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, - close_connection/2, accept_ack/2, - handshake/2, tcp_host/1]). + close_connection/2, handshake/2, tcp_host/1]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/9, @@ -87,7 +86,6 @@ -spec connection_info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. -spec close_connection(pid(), string()) -> 'ok'. --spec accept_ack(any(), rabbit_net:socket()) -> ok. -spec on_node_down(node()) -> 'ok'. -spec tcp_listener_addresses(listener_config()) -> [address()]. @@ -363,16 +361,16 @@ handshake(Ref, ProxyProtocol) -> true -> {ok, ProxyInfo} = ranch:recv_proxy_header(Ref, 1000), {ok, Sock} = ranch:handshake(Ref), - tune_buffer_size(Sock), - ok = file_handle_cache:obtain(), + setup_socket(Sock), {ok, {rabbit_proxy_socket, Sock, ProxyInfo}}; false -> - ranch:handshake(Ref) + {ok, Sock} = ranch:handshake(Ref), + setup_socket(Sock), + {ok, Sock} end. -accept_ack(Ref, Sock) -> - ok = ranch:accept_ack(Ref), - tune_buffer_size(Sock), +setup_socket(Sock) -> + ok = tune_buffer_size(Sock), ok = file_handle_cache:obtain(). tune_buffer_size(Sock) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 0a3a54b052..5c0d1c0070 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -83,6 +83,7 @@ ]). -define(TICK_TIME, 1000). %% the ra server tick time +-define(DELETE_TIMEOUT, 5000). %% the ra server tick time %%---------------------------------------------------------------------------- @@ -307,16 +308,19 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused - Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, + Timeout = ?DELETE_TIMEOUT, Msgs = quorum_messages(Name), - _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of + Servers = [{Name, Node} || Node <- QNodes], + case ra:delete_cluster(Servers, Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive {'DOWN', MRef, process, _, _} -> ok + after Timeout -> + ok = force_delete_queue(Servers) end, + ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?TICK_TIME), {ok, Msgs}; @@ -327,16 +331,41 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, true -> %% If all ra nodes were already down, the delete %% has succeed - rabbit_core_metrics:queue_deleted(QName), + delete_queue_data(QName, ActingUser), {ok, Msgs}; false -> - rabbit_misc:protocol_error( - internal_error, - "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p", - [rabbit_misc:rs(QName), Errs]) + %% attempt forced deletion of all servers + rabbit_log:warning( + "Could not delete quorum queue '~s', not enough nodes " + " online to reach a quorum: ~255p." + " Attempting force delete.", + [rabbit_misc:rs(QName), Errs]), + ok = force_delete_queue(Servers), + delete_queue_data(QName, ActingUser), + {ok, Msgs} end end. + +force_delete_queue(Servers) -> + [begin + case catch(ra:delete_server(S)) of + ok -> ok; + Err -> + rabbit_log:warning( + "Force delete of ~w failed with: ~w" + "This may require manual data clean up~n", + [S, Err]), + ok + end + end || S <- Servers], + ok. + +delete_queue_data(QName, ActingUser) -> + _ = rabbit_amqqueue:internal_delete(QName, ActingUser), + ok. + + delete_immediately(Resource, {_Name, _} = QPid) -> _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER), {ok, _} = ra:delete_cluster([QPid]), @@ -472,7 +501,7 @@ requeue(ConsumerTag, MsgIds, QState) -> cleanup_data_dir() -> Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} - <- rabbit_amqqueue:list_by_type(quorum), + <- rabbit_amqqueue:list_by_type(quorum), lists:member(node(), Nodes)], Registered = ra_directory:list_registered(), _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, |
