summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-16 12:57:28 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-16 12:57:28 +0100
commit2cc1fa0b3301079ed034ffc1662d3c260c85efa4 (patch)
tree188599efc615d2e631beb448a20a75194e511da5 /src
parentcaef9d20782b30a44efa3d2707446018f4c32e7b (diff)
parent5a390d37a3bd2412d16306a07657f45c1e10e0d0 (diff)
downloadrabbitmq-server-git-2cc1fa0b3301079ed034ffc1662d3c260c85efa4.tar.gz
Merge branch 'master' into rabbitmq-management-649-single-active-consumer
Conflicts: src/rabbit_fifo.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_channel.erl46
-rw-r--r--src/rabbit_fifo.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
-rw-r--r--src/rabbit_networking.erl16
-rw-r--r--src/rabbit_quorum_queue.erl47
6 files changed, 66 insertions, 73 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5eb72bfffd..af42d68359 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -15,8 +15,8 @@
%%
-module(rabbit_amqqueue_process).
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
-behaviour(gen_server2).
@@ -606,13 +606,6 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-send_mandatory(#delivery{mandatory = false}) ->
- ok;
-send_mandatory(#delivery{mandatory = true,
- sender = SenderPid,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
@@ -676,7 +669,6 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
State = #q{overflow = Overflow,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 805d9f538f..634789adab 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -34,8 +34,6 @@
%% * Keeping track of consumers
%% * Keeping track of unacknowledged deliveries to consumers
%% * Keeping track of publisher confirms
-%% * Keeping track of mandatory message routing confirmations
-%% and returns
%% * Transaction management
%% * Authorisation (enforcing permissions)
%% * Publishing trace events if tracing is enabled
@@ -143,9 +141,6 @@
%% a list of tags for published messages that were
%% rejected but are yet to be sent to the client
rejected,
- %% a dtree used to track oustanding notifications
- %% for messages published as mandatory
- mandatory,
%% same as capabilities in the reader
capabilities,
%% tracing exchange resource if tracing is enabled,
@@ -469,7 +464,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
unconfirmed = dtree:empty(),
rejected = [],
confirmed = [],
- mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = Prefetch,
@@ -502,7 +496,6 @@ prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
{reject_publish, _MsgSeqNos, _QPid} -> 5;
- {mandatory_received, _MsgSeqNo, _QPid} -> 5;
_ -> 0
end.
@@ -637,10 +630,6 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
|| {ConsumerTag, CreditDrained} <- CTagCredit],
noreply(State);
-handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
- %% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
-
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
@@ -1707,17 +1696,13 @@ track_delivering_queue(NoAck, QPid, QName,
false -> sets:add_element(QRef, DQ)
end}.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
- mandatory = Mand})
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC})
when ?IS_CLASSIC(QPid) ->
- {MMsgs, Mand1} = dtree:take(QPid, Mand),
- [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
- State1 = State#ch{mandatory = Mand1},
case rabbit_misc:is_abnormal_exit(Reason) of
true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- record_rejects(MXs, State1#ch{unconfirmed = UC1});
+ record_rejects(MXs, State#ch{unconfirmed = UC1});
false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State1#ch{unconfirmed = UC1})
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
end;
handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
@@ -1972,7 +1957,7 @@ foreach_per_queue(F, UAL, Acc) ->
consumer_queue_refs(Consumers) ->
lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
- <- maps:to_list(Consumers)]).
+ <- maps:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -2038,11 +2023,11 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
queue_monitors = QMons1},
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo,
- Message, State1),
- State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
- XName, State2),
- case rabbit_event:stats_level(State3, #ch.stats_timer) of
+ ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
+ Message, State1),
+ State2 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo,
+ XName, State1),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
@@ -2051,16 +2036,13 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
_ ->
ok
end,
- State3#ch{queue_states = QueueStates}.
+ State2#ch{queue_states = QueueStates}.
-process_routing_mandatory(false, _, _, _, State) ->
- State;
-process_routing_mandatory(true, [], _, Msg, State) ->
+process_routing_mandatory(true, [], Msg, State) ->
ok = basic_return(Msg, State, no_route),
- State;
-process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg,
- State#ch.mandatory)}.
+ ok;
+process_routing_mandatory(_, _, _, _) ->
+ ok.
process_routing_confirm(false, _, _, _, State) ->
State;
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 94a4e7d709..8f34c01210 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -613,7 +613,9 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) ->
+state_enter(eol, #state{enqueuers = Enqs,
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0),
AllConsumers = maps:merge(Custs, WaitingConsumers1),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 61f27a4a83..bf0a5a7ed0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -542,13 +542,6 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-send_mandatory(#delivery{mandatory = false}) ->
- ok;
-send_mandatory(#delivery{mandatory = true,
- sender = SenderPid,
- msg_seq_no = MsgSeqNo}) ->
- gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-
send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
@@ -707,13 +700,11 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% We reset mandatory to false here because we will have sent the
-%% mandatory_received already as soon as we got the message. We also
-%% need to send an ack for these messages since the channel is waiting
+%% We need to send an ack for these messages since the channel is waiting
%% for one for the via-GM case and we will not now receive one.
promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
maybe_flow_ack(Sender, Flow),
- Delivery#delivery{mandatory = false}.
+ Delivery.
noreply(State) ->
{NewState, Timeout} = next_state(State),
@@ -832,7 +823,6 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
- send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case maps:find(MsgId, MS) of
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 54f4b8a87d..6131e2f294 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -35,8 +35,7 @@
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
- close_connection/2, accept_ack/2,
- handshake/2, tcp_host/1]).
+ close_connection/2, handshake/2, tcp_host/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -87,7 +86,6 @@
-spec connection_info_all(rabbit_types:info_keys()) ->
[rabbit_types:infos()].
-spec close_connection(pid(), string()) -> 'ok'.
--spec accept_ack(any(), rabbit_net:socket()) -> ok.
-spec on_node_down(node()) -> 'ok'.
-spec tcp_listener_addresses(listener_config()) -> [address()].
@@ -363,16 +361,16 @@ handshake(Ref, ProxyProtocol) ->
true ->
{ok, ProxyInfo} = ranch:recv_proxy_header(Ref, 1000),
{ok, Sock} = ranch:handshake(Ref),
- tune_buffer_size(Sock),
- ok = file_handle_cache:obtain(),
+ setup_socket(Sock),
{ok, {rabbit_proxy_socket, Sock, ProxyInfo}};
false ->
- ranch:handshake(Ref)
+ {ok, Sock} = ranch:handshake(Ref),
+ setup_socket(Sock),
+ {ok, Sock}
end.
-accept_ack(Ref, Sock) ->
- ok = ranch:accept_ack(Ref),
- tune_buffer_size(Sock),
+setup_socket(Sock) ->
+ ok = tune_buffer_size(Sock),
ok = file_handle_cache:obtain().
tune_buffer_size(Sock) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 0a3a54b052..5c0d1c0070 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -83,6 +83,7 @@
]).
-define(TICK_TIME, 1000). %% the ra server tick time
+-define(DELETE_TIMEOUT, 5000). %% the ra server tick time
%%----------------------------------------------------------------------------
@@ -307,16 +308,19 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
- Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
+ Timeout = ?DELETE_TIMEOUT,
Msgs = quorum_messages(Name),
- _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
- case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of
+ Servers = [{Name, Node} || Node <- QNodes],
+ case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
MRef = erlang:monitor(process, Leader),
receive
{'DOWN', MRef, process, _, _} ->
ok
+ after Timeout ->
+ ok = force_delete_queue(Servers)
end,
+ ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?TICK_TIME),
{ok, Msgs};
@@ -327,16 +331,41 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
true ->
%% If all ra nodes were already down, the delete
%% has succeed
- rabbit_core_metrics:queue_deleted(QName),
+ delete_queue_data(QName, ActingUser),
{ok, Msgs};
false ->
- rabbit_misc:protocol_error(
- internal_error,
- "Cannot delete quorum queue '~s', not enough nodes online to reach a quorum: ~255p",
- [rabbit_misc:rs(QName), Errs])
+ %% attempt forced deletion of all servers
+ rabbit_log:warning(
+ "Could not delete quorum queue '~s', not enough nodes "
+ " online to reach a quorum: ~255p."
+ " Attempting force delete.",
+ [rabbit_misc:rs(QName), Errs]),
+ ok = force_delete_queue(Servers),
+ delete_queue_data(QName, ActingUser),
+ {ok, Msgs}
end
end.
+
+force_delete_queue(Servers) ->
+ [begin
+ case catch(ra:delete_server(S)) of
+ ok -> ok;
+ Err ->
+ rabbit_log:warning(
+ "Force delete of ~w failed with: ~w"
+ "This may require manual data clean up~n",
+ [S, Err]),
+ ok
+ end
+ end || S <- Servers],
+ ok.
+
+delete_queue_data(QName, ActingUser) ->
+ _ = rabbit_amqqueue:internal_delete(QName, ActingUser),
+ ok.
+
+
delete_immediately(Resource, {_Name, _} = QPid) ->
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
{ok, _} = ra:delete_cluster([QPid]),
@@ -472,7 +501,7 @@ requeue(ConsumerTag, MsgIds, QState) ->
cleanup_data_dir() ->
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}
- <- rabbit_amqqueue:list_by_type(quorum),
+ <- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
_ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,