summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-28 16:00:07 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-28 17:51:29 +0000
commit649b8c95e6e138b8bfeb9bbd724225c0e2693791 (patch)
tree5e61b70fd6b5f51fcdae45c4c2427e97b8bd4f05
parenta4b602567081b28c4bc53ac5995b5c054a305da9 (diff)
downloadrabbitmq-server-git-649b8c95e6e138b8bfeb9bbd724225c0e2693791.tar.gz
Fix basic get message ready count
Ensure the messages is ready is returned from basic get. Also fix message count when using basic.delete. [#162502929]
-rw-r--r--src/rabbit_fifo.erl35
-rw-r--r--src/rabbit_fifo_client.erl25
-rw-r--r--src/rabbit_quorum_queue.erl30
-rw-r--r--test/rabbit_fifo_SUITE.erl26
4 files changed, 65 insertions, 51 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index c3b5e66355..6b2971a02a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -404,21 +404,22 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
_ when Exists ->
%% a dequeue using the same consumer_id isn't possible at this point
{State0, {dequeue, empty}};
- _ ->
+ Ready ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
case Settlement of
unsettled ->
{_, Pid} = ConsumerId,
- {State2, {dequeue, {MsgId, Msg}},
+ {State2, {dequeue, {MsgId, Msg}, Ready-1},
[{monitor, process, Pid}]};
settled ->
%% immediately settle the checkout
{State, _, Effects} = apply(Meta,
- make_settle(ConsumerId, [MsgId]),
+ make_settle(ConsumerId,
+ [MsgId]),
State2),
- {State, {dequeue, {MsgId, Msg}}, Effects}
+ {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
end
end;
apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
@@ -761,9 +762,8 @@ query_single_active_consumer(#state{consumer_strategy = single_active,
query_single_active_consumer(_) ->
disabled.
-query_stat(#state{messages = M,
- consumers = Consumers}) ->
- {maps:size(M), maps:size(Consumers)}.
+query_stat(#state{consumers = Consumers} = State) ->
+ {messages_ready(State), maps:size(Consumers)}.
-spec usage(atom()) -> float().
usage(Name) when is_atom(Name) ->
@@ -1632,7 +1632,8 @@ enq_enq_deq_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
+ NumReady = 1,
+ {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
ok.
@@ -1642,7 +1643,7 @@ enq_enq_deq_deq_settle_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
+ {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
{_State4, {dequeue, empty}} =
@@ -1654,7 +1655,7 @@ enq_enq_checkout_get_settled_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
% get returns a reply value
- {_State2, {dequeue, {0, {_, first}}}, _Effs} =
+ {_State2, {dequeue, {0, {_, first}}, _}, _Effs} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
State1),
ok.
@@ -1672,7 +1673,7 @@ untracked_enq_deq_test() ->
{State1, _, _} = apply(meta(1),
make_enqueue(undefined, undefined, first),
State0),
- {_State2, {dequeue, {0, {_, first}}}, _} =
+ {_State2, {dequeue, {0, {_, first}}, _}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
@@ -1789,9 +1790,9 @@ cancelled_checkout_out_test() ->
?assertEqual(1, maps:size(State2#state.messages)),
?assertEqual(1, lqueue:len(State2#state.returns)),
- {State3, {dequeue, {0, {_, first}}}, _} =
+ {State3, {dequeue, {0, {_, first}}, _}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- {_State, {dequeue, {_, {_, second}}}, _} =
+ {_State, {dequeue, {_, {_, second}}, _}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
@@ -2107,7 +2108,7 @@ pending_enqueue_is_enqueued_on_down_test() ->
Pid = self(),
{State0, _} = enq(1, 2, first, test_init(test)),
{State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
- {_State2, {dequeue, {0, {_, first}}}, _} =
+ {_State2, {dequeue, {0, {_, first}}, 0}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
@@ -2156,7 +2157,7 @@ purge_test() ->
{State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
- {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} =
+ {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
ok.
@@ -2458,11 +2459,11 @@ enq(Idx, MsgSeq, Msg, State) ->
apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
deq(Idx, Cid, Settlement, State0) ->
- {State, {dequeue, Msg}, _} =
+ {State, {dequeue, {MsgId, Msg}, _}, _} =
apply(meta(Idx),
make_checkout(Cid, {dequeue, Settlement}, #{}),
State0),
- {State, Msg}.
+ {State, {MsgId, Msg}}.
check_n(Cid, Idx, N, State) ->
strip_reply(
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index dfbf2f477a..5cc73b5d63 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -177,7 +177,8 @@ enqueue(Msg, State) ->
%% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}'
-spec dequeue(rabbit_fifo:consumer_tag(),
Settlement :: settled | unsettled, state()) ->
- {ok, rabbit_fifo:delivery_msg() | empty, state()} | {error | timeout, term()}.
+ {ok, {rabbit_fifo:delivery_msg(), non_neg_integer()}
+ | empty, state()} | {error | timeout, term()}.
dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
Node = pick_node(State0),
ConsumerId = consumer_id(ConsumerTag),
@@ -186,8 +187,10 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{dequeue, Settlement},
#{}),
Timeout) of
- {ok, {dequeue, Reply}, Leader} ->
- {ok, Reply, State0#state{leader = Leader}};
+ {ok, {dequeue, empty}, Leader} ->
+ {ok, empty, State0#state{leader = Leader}};
+ {ok, {dequeue, Msg, NumReady}, Leader} ->
+ {ok, {Msg, NumReady}, State0#state{leader = Leader}};
Err ->
Err
end.
@@ -399,12 +402,18 @@ purge(Node) ->
Err
end.
--spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}}
- | {error | timeout, term()}.
+-spec stat(ra_server_id()) ->
+ {ok, non_neg_integer(), non_neg_integer()}
+ | {error | timeout, term()}.
stat(Leader) ->
- Query = fun (State) -> rabbit_fifo:query_stat(State) end,
- {ok, {_, Stat}, _} = ra:local_query(Leader, Query),
- Stat.
+ %% short timeout as we don't want to spend too long if it is going to
+ %% fail anyway
+ case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250) of
+ {ok, {_, {R, C}}, _} ->
+ {ok, R, C};
+ Err ->
+ Err
+ end.
%% @doc returns the cluster name
-spec cluster_name(state()) -> cluster_name().
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 1f3aa1f38b..cb625e9036 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -310,11 +310,11 @@ stop(VHost) ->
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
delete(#amqqueue{type = quorum, pid = {Name, _},
- name = QName, quorum_nodes = QNodes},
+ name = QName, quorum_nodes = QNodes} = Q,
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = ?DELETE_TIMEOUT,
- Msgs = quorum_messages(Name),
+ {ok, ReadyMsgs, _} = stat(Q),
Servers = [{Name, Node} || Node <- QNodes],
case ra:delete_cluster(Servers, Timeout) of
{ok, {_, LeaderNode} = Leader} ->
@@ -328,7 +328,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
ok = delete_queue_data(QName, ActingUser),
rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?TICK_TIME),
- {ok, Msgs};
+ {ok, ReadyMsgs};
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
(_) -> false
@@ -337,7 +337,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
%% If all ra nodes were already down, the delete
%% has succeed
delete_queue_data(QName, ActingUser),
- {ok, Msgs};
+ {ok, ReadyMsgs};
false ->
%% attempt forced deletion of all servers
rabbit_log:warning(
@@ -347,14 +347,14 @@ delete(#amqqueue{type = quorum, pid = {Name, _},
[rabbit_misc:rs(QName), Errs]),
ok = force_delete_queue(Servers),
delete_queue_data(QName, ActingUser),
- {ok, Msgs}
+ {ok, ReadyMsgs}
end
end.
force_delete_queue(Servers) ->
[begin
- case catch(ra:delete_server(S)) of
+ case catch(ra:force_delete_server(S)) of
ok -> ok;
Err ->
rabbit_log:warning(
@@ -390,9 +390,9 @@ credit(CTag, Credit, Drain, QState) ->
-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
rabbit_fifo_client:state()) ->
- {'ok', 'empty', rabbit_fifo_client:state()} |
- {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
-basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
+ {'ok', 'empty', rabbit_fifo_client:state()} |
+ {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
+basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck,
CTag0, QState0) ->
CTag = quorum_ctag(CTag0),
Settlement = case NoAck of
@@ -404,11 +404,11 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ {ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
Count = maps:get(delivery_count, MsgHeader, 0),
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
- {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
+ {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
end.
@@ -490,8 +490,12 @@ info(Q, Items) ->
stat(#amqqueue{pid = Leader}) ->
try
- {Ready, Consumers} = rabbit_fifo_client:stat(Leader),
- {ok, Ready, Consumers}
+ case rabbit_fifo_client:stat(Leader) of
+ {ok, _, _} = Stat ->
+ Stat;
+ _ ->
+ {ok, 0, 0}
+ end
catch
_:_ ->
%% Leader is not available, cluster might be in minority
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 6df61d4288..b8f6a110b3 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -145,7 +145,7 @@ return(Config) ->
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
{_, _, F2} = process_ra_events(F1, 100),
- {ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
{ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
ra:stop_server(ServerId),
@@ -237,9 +237,9 @@ resends_lost_command(Config) ->
meck:unload(ra),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
{_, _, F4} = process_ra_events(F3, 500),
- {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
ra:stop_server(ServerId),
ok.
@@ -299,7 +299,7 @@ returns_after_down(Config) ->
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
timer:sleep(1000),
% message should be available for dequeue
- {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
ra:stop_server(ServerId),
ok.
@@ -322,9 +322,9 @@ resends_after_lost_applied(Config) ->
% send another message
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
{_, _, F4} = process_ra_events(F3, 500),
- {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
ra:stop_server(ServerId),
ok.
@@ -395,7 +395,7 @@ cancel_checkout(Config) ->
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
ok.
credit(Config) ->
@@ -445,7 +445,7 @@ untracked_enqueue(Config) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
timer:sleep(100),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
ra:stop_server(ServerId),
ok.
@@ -504,10 +504,10 @@ dequeue(Config) ->
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
{_, _, F2} = process_ra_events(F2_, 100),
- {ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
{_, _, F4} = process_ra_events(F4_, 100),
- {ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
{ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
ra:stop_server(ServerId),
ok.
@@ -521,7 +521,7 @@ enq_deq_n(0, F0, Acc) ->
enq_deq_n(N, F, Acc) ->
{ok, F1} = rabbit_fifo_client:enqueue(N, F),
{_, _, F2} = process_ra_events(F1, 10),
- {ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
+ {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
{_, _, F4} = process_ra_events(F3, 5),
enq_deq_n(N-1, F4, [Deq | Acc]).