diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-01-15 00:45:17 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-01-15 00:45:17 +0300 |
| commit | 45632289309e80a38348ed936961330a4ae0763b (patch) | |
| tree | 4680ccc9d6631accb723d0cab4306bd454be2bb0 /src | |
| parent | 3c42423dd4a1f445d0be28338b53c5379442195f (diff) | |
| parent | 2ca9c2927949199d099126c6cebdccb4d06bb615 (diff) | |
| download | rabbitmq-server-git-45632289309e80a38348ed936961330a4ae0763b.tar.gz | |
Merge pull request #1819 from rabbitmq/qq-testing
Testing of quorum queues
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 11 |
5 files changed, 32 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d4301647d3..fe73e760b2 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1278,6 +1278,9 @@ forget_all_durable(Node) -> %% Try to promote a slave while down - it should recover as a %% master. We try to take the oldest slave here for best chance of %% recovery. +forget_node_for_queue(DeadNode, Q = #amqqueue{type = quorum, + quorum_nodes = QN}) -> + forget_node_for_queue(DeadNode, QN, Q); forget_node_for_queue(DeadNode, Q = #amqqueue{recoverable_slaves = RS}) -> forget_node_for_queue(DeadNode, RS, Q). @@ -1291,11 +1294,12 @@ forget_node_for_queue(_DeadNode, [], #amqqueue{name = Name}) -> forget_node_for_queue(DeadNode, [DeadNode | T], Q) -> forget_node_for_queue(DeadNode, T, Q); -forget_node_for_queue(DeadNode, [H|T], Q) -> - case node_permits_offline_promotion(H) of - false -> forget_node_for_queue(DeadNode, T, Q); - true -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, - ok = mnesia:write(rabbit_durable_queue, Q1, write) +forget_node_for_queue(DeadNode, [H|T], #amqqueue{type = Type} = Q) -> + case {node_permits_offline_promotion(H), Type} of + {false, _} -> forget_node_for_queue(DeadNode, T, Q); + {true, classic} -> Q1 = Q#amqqueue{pid = rabbit_misc:node_to_fake_pid(H)}, + ok = mnesia:write(rabbit_durable_queue, Q1, write); + {true, quorum} -> ok end. node_permits_offline_promotion(Node) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a4e4638bb2..805d9f538f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -562,7 +562,6 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(rabbit_channel_interceptor:intercept_in( expand_shortcuts(Method, State), Content, IState), State) of diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fc699111d1..4fe4d954b9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -42,6 +42,7 @@ query_ra_indexes/1, query_consumer_count/1, query_consumers/1, + query_stat/1, usage/1, zero/1, @@ -721,7 +722,10 @@ query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsume Acc) end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -%% other + +query_stat(#state{messages = M, + consumers = Consumers}) -> + {maps:size(M), maps:size(Consumers)}. -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 5a7eb37b9d..dfbf2f477a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -38,7 +38,8 @@ untracked_enqueue/2, purge/1, cluster_name/1, - update_machine_state/2 + update_machine_state/2, + stat/1 ]). -include_lib("ra/include/ra.hrl"). @@ -398,6 +399,13 @@ purge(Node) -> Err end. +-spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}} + | {error | timeout, term()}. +stat(Leader) -> + Query = fun (State) -> rabbit_fifo:query_stat(State) end, + {ok, {_, Stat}, _} = ra:local_query(Leader, Query), + Stat. + %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5c8524bdbc..429be39067 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -432,8 +432,15 @@ infos(QName) -> info(Q, Items) -> [{Item, i(Item, Q)} || Item <- Items]. -stat(_Q) -> - {ok, 0, 0}. %% TODO length, consumers count +stat(#amqqueue{pid = Leader}) -> + try + {Ready, Consumers} = rabbit_fifo_client:stat(Leader), + {ok, Ready, Consumers} + catch + _:_ -> + %% Leader is not available, cluster might be in minority + {ok, 0, 0} + end. purge(Node) -> rabbit_fifo_client:purge(Node). |
