summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorD Corbacho <diana@rabbitmq.com>2018-12-19 09:28:36 +0000
committerGitHub <noreply@github.com>2018-12-19 09:28:36 +0000
commite8fb108131762a5511c7c6b091642f19cd29994b (patch)
tree329afae94009e0db36884da2fc722d96cf4f52e6 /src
parent6b73cadf47b2a23be04c3c25fed216f3e8240458 (diff)
parent78eedc2323bdf318d0c340f0479ae8ea1065a7b2 (diff)
downloadrabbitmq-server-git-e8fb108131762a5511c7c6b091642f19cd29994b.tar.gz
Merge pull request #1803 from rabbitmq/vanlightly-bugs
Bugfixes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_fifo.erl108
-rw-r--r--src/rabbit_fifo_client.erl33
-rw-r--r--src/rabbit_quorum_memory_manager.erl2
-rw-r--r--src/rabbit_quorum_queue.erl67
5 files changed, 134 insertions, 83 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f749d9f30e..996774fe35 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
consumer_mapping = ConsumerMapping} = State0) ->
case QueueStates of
#{Name := QState0} ->
+ QName = rabbit_quorum_queue:queue_name(QState0),
case rabbit_quorum_queue:handle_event(Evt, QState0) of
{{delivery, CTag, Msgs}, QState1} ->
AckRequired = case maps:find(CTag, ConsumerMapping) of
@@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
true ->
QState1
end,
- QName = rabbit_quorum_queue:queue_name(QState2),
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
@@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(Name, QNames) of
- {ok, QName} -> erase_queue_stats(QName);
- error -> ok
- end,
+ erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
queue_names = maps:remove(Name, QNames)})
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d365d41a96..8c7b208855 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -56,7 +56,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
- make_update_state/1
+ make_update_config/1
]).
-type raw_msg() :: term().
@@ -131,7 +131,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
--record(update_state, {config :: config()}).
+-record(update_config, {config :: config()}).
@@ -143,7 +143,7 @@
#discard{} |
#credit{} |
#purge{} |
- #update_state{}.
+ #update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -260,10 +260,10 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_state(Conf, #state{name = Name,
+ update_config(Conf, #state{name = Name,
queue_resource = Resource}).
-update_state(Conf, State) ->
+update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
@@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection},
Effects0, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
- % mark all consumers and enqueuers as suspect
- % and monitor the node
- {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C,
- {Co, St0}) when node(P) =:= Node ->
- St = return_all(St0, Checked0),
- {maps:put(K, C#consumer{suspected_down = true,
- checked_out = #{}},
- Co),
- St};
- (K, C, {Co, St}) ->
- {maps:put(K, C, Co), St}
- end, {#{}, State0}, Cons0),
+ % mark all consumers and enqueuers as suspected down
+ % and monitor the node so that we can find out the final state of the
+ % process at some later point
+ {Cons, State} = maps:fold(
+ fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0}) when node(P) =:= Node ->
+ St = return_all(St0, Checked0),
+ %% TODO: need to increment credit here
+ %% with the size of the Checked map
+ Credit = increase_credit(C, maps:size(Checked0)),
+ {maps:put(K, C#consumer{suspected_down = true,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St};
+ (K, C, {Co, St}) ->
+ {maps:put(K, C, Co), St}
+ end, {#{}, State0}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = true};
(_, E) -> E
@@ -515,8 +521,8 @@ apply(_, {nodeup, Node}, Effects0,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok};
-apply(_, #update_state{config = Conf}, Effects, State) ->
- {update_state(Conf, State), Effects, ok}.
+apply(_, #update_config{config = Conf}, Effects, State) ->
+ {update_config(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -587,7 +593,9 @@ overview(#state{consumers = Cons,
get_checked_out(Cid, From, To, #state{consumers = Consumers}) ->
case Consumers of
#{Cid := #consumer{checked_out = Checked}} ->
- [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)];
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
_ ->
[]
end.
@@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
+return(ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
- Con = case Life of
- auto ->
- Num = length(MsgNumMsgs),
- Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, Num)};
- once ->
- Con0#consumer{checked_out = Checked}
- end,
+ Con = Con0#consumer{checked_out = Checked,
+ credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
@@ -900,12 +902,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}).
-return_all(State, Checked) ->
- maps:fold(fun (_, '$prefix_msg', S) ->
- return_one(0, '$prefix_msg', S);
- (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, State, Checked).
+return_all(State, Checked0) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ lists:foldl(fun ({_, '$prefix_msg'}, S) ->
+ return_one(0, '$prefix_msg', S);
+ ({_, {MsgNum, Msg}}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -1170,9 +1175,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
--spec make_update_state(config()) -> protocol().
-make_update_state(Config) ->
- #update_state{config = Config}.
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
@@ -1502,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
Node = node(Pid),
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
- {State1, Effects1} = check(Cid, 2, State0),
+ {State1, Effects1} = check_auto(Cid, 2, State0),
+ #consumer{credit = 0} = maps:get(Cid, State1#state.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
+ %% validate consumer has credit
{State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
@@ -1865,6 +1873,26 @@ purge_with_checkout_test() ->
?assertEqual(0, maps:size(Checked)),
ok.
+down_returns_checked_out_in_order_test() ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#state.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2),
+ Returns = lqueue:to_list(S#state.returns),
+ ?assertEqual(100, length(Returns)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
@@ -1900,7 +1928,7 @@ check_auto(Cid, Idx, State) ->
check(Cid, Idx, Num, State) ->
strip_reply(
apply(meta(Idx),
- make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
+ make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
[], State)).
settle(Cid, Idx, MsgId, State) ->
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 9cdb1dfbe7..955c0e4d9d 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -52,10 +52,12 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
+-type cluster_name() :: rabbit_types:r(queue).
+
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: ra_cluster_name(),
+-record(state, {cluster_name :: cluster_name(),
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
@@ -88,7 +90,7 @@
%% @param ClusterName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
--spec init(ra_cluster_name(), [ra_server_id()]) -> state().
+-spec init(cluster_name(), [ra_server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).
@@ -98,7 +100,7 @@ init(ClusterName, Servers) ->
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
@@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) ->
soft_limit = SoftLimit,
timeout = Timeout}.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -397,12 +399,12 @@ purge(Node) ->
end.
%% @doc returns the cluster name
--spec cluster_name(state()) -> ra_cluster_name().
+-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
+ case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
CDels0)}};
#consumer{last_msg_id = Prev} = C
when FstId > Prev+1 ->
+ NumMissing = FstId - Prev + 1,
+ %% there may actually be fewer missing messages returned than expected
+ %% This can happen when a node the channel is on gets disconnected
+ %% from the node the leader is on and then reconnected afterwards.
+ %% When the node is disconnected the leader will return all checked
+ %% out messages to the main queue to ensure they don't get stuck in
+ %% case the node never comes back.
Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
Del = {delivery, Tag, Missing ++ IdMsgs},
{Del, State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
- length(IdMsgs) + length(Missing),
+ length(IdMsgs) + NumMissing,
C, CDels0)}};
#consumer{last_msg_id = Prev}
when FstId =< Prev ->
@@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command,
ok = ra:pipeline_command(Node, Command, Seq),
State#state{pending = Pending#{Seq => {Correlation, Command}}}.
-add_command(_Cid, _Tag, [], Acc) ->
+add_command(_, _, [], Acc) ->
Acc;
-add_command(Cid, Tag, MsgIds, Acc) ->
- [{Tag, MsgIds, Cid} | Acc].
+add_command(Cid, settle, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, return, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc];
+add_command(Cid, discard, MsgIds, Acc) ->
+ [rabbit_fifo:make_settle(Cid, MsgIds) | Acc].
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
index 347f7f205e..f567561f31 100644
--- a/src/rabbit_quorum_memory_manager.erl
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_quorum_memory_manager).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 2394822763..53dd2e2a7d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -54,10 +54,6 @@
{'ok', rabbit_fifo_client:state()}.
-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
--spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(),
- rabbit_fifo_client:state()) ->
- {'ok', 'empty', rabbit_fifo_client:state()} |
- {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
@@ -82,6 +78,8 @@
open_files
]).
+-define(TICK_TIME, 1000). %% the ra server tick time
+
%%----------------------------------------------------------------------------
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
@@ -147,9 +145,11 @@ declare(#amqqueue{name = QName,
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
-ra_machine_config(Q = #amqqueue{name = QName}) ->
- #{dead_letter_handler => dlx_mfa(Q),
+ra_machine_config(Q = #amqqueue{name = QName,
+ pid = {Name, _}}) ->
+ #{name => Name,
queue_resource => QName,
+ dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
@@ -185,7 +185,8 @@ become_leader(QName, Name) ->
end),
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{quorum_nodes = Nodes}} ->
- [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName])
+ [rpc:call(Node, ?MODULE, rpc_delete_metrics,
+ [QName], ?TICK_TIME)
|| Node <- Nodes, Node =/= node()];
_ ->
ok
@@ -198,22 +199,29 @@ rpc_delete_metrics(QName) ->
ok.
update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
- R = reductions(Name),
- rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
- Util = case C of
- 0 -> 0;
- _ -> rabbit_fifo:usage(Name)
- end,
- Infos = [{consumers, C}, {consumer_utilisation, Util},
- {message_bytes_ready, MsgBytesReady},
- {message_bytes_unacknowledged, MsgBytesUnack},
- {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)],
- rabbit_core_metrics:queue_stats(QName, Infos),
- rabbit_event:notify(queue_stats, Infos ++ [{name, QName},
- {messages, M},
- {messages_ready, MR},
- {messages_unacknowledged, MU},
- {reductions, R}]).
+ %% this makes calls to remote processes so cannot be run inside the
+ %% ra server
+ _ = spawn(fun() ->
+ R = reductions(Name),
+ rabbit_core_metrics:queue_stats(QName, MR, MU, M, R),
+ Util = case C of
+ 0 -> 0;
+ _ -> rabbit_fifo:usage(Name)
+ end,
+ Infos = [{consumers, C}, {consumer_utilisation, Util},
+ {message_bytes_ready, MsgBytesReady},
+ {message_bytes_unacknowledged, MsgBytesUnack},
+ {message_bytes, MsgBytesReady + MsgBytesUnack}
+ | infos(QName)],
+ rabbit_core_metrics:queue_stats(QName, Infos),
+ rabbit_event:notify(queue_stats,
+ Infos ++ [{name, QName},
+ {messages, M},
+ {messages_ready, MR},
+ {messages_unacknowledged, MU},
+ {reductions, R}])
+ end),
+ ok.
reductions(Name) ->
try
@@ -268,7 +276,7 @@ stop(VHost) ->
_ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
ok.
--spec delete(rabbit_types:amqqueue(),
+-spec delete(#amqqueue{},
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
@@ -286,7 +294,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
{'DOWN', MRef, process, _, _} ->
ok
end,
- rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]),
+ rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
+ ?TICK_TIME),
{ok, Msgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@@ -322,6 +331,10 @@ reject(false, CTag, MsgIds, QState) ->
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
+-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
+ rabbit_fifo_client:state()) ->
+ {'ok', 'empty', rabbit_fifo_client:state()} |
+ {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
CTag0, QState0) ->
CTag = quorum_ctag(CTag0),
@@ -657,7 +670,7 @@ i(memory, #amqqueue{pid = {Name, _}}) ->
end;
i(state, #amqqueue{pid = {Name, Node}}) ->
%% Check against the leader or last known leader
- case rpc:call(Node, ?MODULE, cluster_state, [Name]) of
+ case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of
{badrpc, _} -> down;
State -> State
end;
@@ -706,7 +719,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) ->
[{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}].
is_process_alive(Name, Node) ->
- erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])).
+ erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)).
quorum_messages(QName) ->
case ets:lookup(queue_coarse_metrics, QName) of