summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2019-01-15 00:45:17 +0300
committerGitHub <noreply@github.com>2019-01-15 00:45:17 +0300
commit45632289309e80a38348ed936961330a4ae0763b (patch)
tree4680ccc9d6631accb723d0cab4306bd454be2bb0 /src
parent3c42423dd4a1f445d0be28338b53c5379442195f (diff)
parent2ca9c2927949199d099126c6cebdccb4d06bb615 (diff)
downloadrabbitmq-server-git-45632289309e80a38348ed936961330a4ae0763b.tar.gz
Merge pull request #1819 from rabbitmq/qq-testing
Testing of quorum queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_fifo.erl6
-rw-r--r--src/rabbit_fifo_client.erl10
-rw-r--r--src/rabbit_quorum_queue.erl11
5 files changed, 32 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d4301647d3..fe73e760b2 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1278,6 +1278,9 @@ forget_all_durable(Node) ->
%% Try to promote a slave while down - it should recover as a
%% master. We try to take the oldest slave here for best chance of
%% recovery.
+forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum,
+ quorum_nodes = QN}) ->
+ forget_node_for_queue(DeadNode, QN, Q);
forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) ->
forget_node_for_queue(DeadNode, RS, Q).
@@ -1291,11 +1294,12 @@ forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) ->
forget_node_for_queue(DeadNode, [DeadNode | T], Q) ->
forget_node_for_queue(DeadNode, T, Q);
-forget_node_for_queue(DeadNode, [H|T], Q) ->
- case node_permits_offline_promotion(H) of
- false -> forget_node_for_queue(DeadNode, T, Q);
- true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
- ok = mnesia:write(rabbit_durable_queue, Q1, write)
+forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) ->
+ case {node_permits_offline_promotion(H), Type} of
+ {false, _} -> forget_node_for_queue(DeadNode, T, Q);
+ {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)},
+ ok = mnesia:write(rabbit_durable_queue, Q1, write);
+ {true, quorum} -> ok
end.
node_permits_offline_promotion(Node) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4e4638bb2..805d9f538f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -562,7 +562,6 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
-
try handle_method(rabbit_channel_interceptor:intercept_in(
expand_shortcuts(Method, State), Content, IState),
State) of
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index fc699111d1..4fe4d954b9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -42,6 +42,7 @@
query_ra_indexes/1,
query_consumer_count/1,
query_consumers/1,
+ query_stat/1,
usage/1,
zero/1,
@@ -721,7 +722,10 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume
Acc)
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-%% other
+
+query_stat(#state{messages = M,
+ consumers = Consumers}) ->
+ {maps:size(M), maps:size(Consumers)}.
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 5a7eb37b9d..dfbf2f477a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -38,7 +38,8 @@
untracked_enqueue/2,
purge/1,
cluster_name/1,
- update_machine_state/2
+ update_machine_state/2,
+ stat/1
]).
-include_lib("ra/include/ra.hrl").
@@ -398,6 +399,13 @@ purge(Node) ->
Err
end.
+-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
+ | {error | timeout, term()}.
+stat(Leader) ->
+ Query = fun (State) -> rabbit_fifo:query_stat(State) end,
+ {ok, {_, Stat}, _} = ra:local_query(Leader, Query),
+ Stat.
+
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5c8524bdbc..429be39067 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -432,8 +432,15 @@ infos(QName) ->
info(Q, Items) ->
[{Item, i(Item, Q)} || Item <- Items].
-stat(_Q) ->
- {ok, 0, 0}. %% TODO length, consumers count
+stat(#amqqueue{pid = Leader}) ->
+ try
+ {Ready, Consumers} = rabbit_fifo_client:stat(Leader),
+ {ok, Ready, Consumers}
+ catch
+ _:_ ->
+ %% Leader is not available, cluster might be in minority
+ {ok, 0, 0}
+ end.
purge(Node) ->
rabbit_fifo_client:purge(Node).