diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-28 16:00:07 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-28 17:51:29 +0000 |
| commit | 649b8c95e6e138b8bfeb9bbd724225c0e2693791 (patch) | |
| tree | 5e61b70fd6b5f51fcdae45c4c2427e97b8bd4f05 | |
| parent | a4b602567081b28c4bc53ac5995b5c054a305da9 (diff) | |
| download | rabbitmq-server-git-649b8c95e6e138b8bfeb9bbd724225c0e2693791.tar.gz | |
Fix basic get message ready count
Ensure the messages is ready is returned from basic get. Also fix
message count when using basic.delete.
[#162502929]
| -rw-r--r-- | src/rabbit_fifo.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 30 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 26 |
4 files changed, 65 insertions, 51 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c3b5e66355..6b2971a02a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -404,21 +404,22 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, _ when Exists -> %% a dequeue using the same consumer_id isn't possible at this point {State0, {dequeue, empty}}; - _ -> + Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), case Settlement of unsettled -> {_, Pid} = ConsumerId, - {State2, {dequeue, {MsgId, Msg}}, + {State2, {dequeue, {MsgId, Msg}, Ready-1}, [{monitor, process, Pid}]}; settled -> %% immediately settle the checkout {State, _, Effects} = apply(Meta, - make_settle(ConsumerId, [MsgId]), + make_settle(ConsumerId, + [MsgId]), State2), - {State, {dequeue, {MsgId, Msg}}, Effects} + {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> @@ -761,9 +762,8 @@ query_single_active_consumer(#state{consumer_strategy = single_active, query_single_active_consumer(_) -> disabled. -query_stat(#state{messages = M, - consumers = Consumers}) -> - {maps:size(M), maps:size(Consumers)}. +query_stat(#state{consumers = Consumers} = State) -> + {messages_ready(State), maps:size(Consumers)}. -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> @@ -1632,7 +1632,8 @@ enq_enq_deq_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = + NumReady = 1, + {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), State2), ok. @@ -1642,7 +1643,7 @@ enq_enq_deq_deq_settle_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = + {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), State2), {_State4, {dequeue, empty}} = @@ -1654,7 +1655,7 @@ enq_enq_checkout_get_settled_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), % get returns a reply value - {_State2, {dequeue, {0, {_, first}}}, _Effs} = + {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. @@ -1672,7 +1673,7 @@ untracked_enq_deq_test() -> {State1, _, _} = apply(meta(1), make_enqueue(undefined, undefined, first), State0), - {_State2, {dequeue, {0, {_, first}}}, _} = + {_State2, {dequeue, {0, {_, first}}, _}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. @@ -1789,9 +1790,9 @@ cancelled_checkout_out_test() -> ?assertEqual(1, maps:size(State2#state.messages)), ?assertEqual(1, lqueue:len(State2#state.returns)), - {State3, {dequeue, {0, {_, first}}}, _} = + {State3, {dequeue, {0, {_, first}}, _}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), - {_State, {dequeue, {_, {_, second}}}, _} = + {_State, {dequeue, {_, {_, second}}, _}, _} = apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), ok. @@ -2107,7 +2108,7 @@ pending_enqueue_is_enqueued_on_down_test() -> Pid = self(), {State0, _} = enq(1, 2, first, test_init(test)), {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), - {_State2, {dequeue, {0, {_, first}}}, _} = + {_State2, {dequeue, {0, {_, first}}, 0}, _} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. @@ -2156,7 +2157,7 @@ purge_test() -> {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value - {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} = + {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. @@ -2458,11 +2459,11 @@ enq(Idx, MsgSeq, Msg, State) -> apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)). deq(Idx, Cid, Settlement, State0) -> - {State, {dequeue, Msg}, _} = + {State, {dequeue, {MsgId, Msg}, _}, _} = apply(meta(Idx), make_checkout(Cid, {dequeue, Settlement}, #{}), State0), - {State, Msg}. + {State, {MsgId, Msg}}. check_n(Cid, Idx, N, State) -> strip_reply( diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index dfbf2f477a..5cc73b5d63 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -177,7 +177,8 @@ enqueue(Msg, State) -> %% @returns `{ok, IdMsg, State}' or `{error | timeout, term()}' -spec dequeue(rabbit_fifo:consumer_tag(), Settlement :: settled | unsettled, state()) -> - {ok, rabbit_fifo:delivery_msg() | empty, state()} | {error | timeout, term()}. + {ok, {rabbit_fifo:delivery_msg(), non_neg_integer()} + | empty, state()} | {error | timeout, term()}. dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> Node = pick_node(State0), ConsumerId = consumer_id(ConsumerTag), @@ -186,8 +187,10 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {dequeue, Settlement}, #{}), Timeout) of - {ok, {dequeue, Reply}, Leader} -> - {ok, Reply, State0#state{leader = Leader}}; + {ok, {dequeue, empty}, Leader} -> + {ok, empty, State0#state{leader = Leader}}; + {ok, {dequeue, Msg, NumReady}, Leader} -> + {ok, {Msg, NumReady}, State0#state{leader = Leader}}; Err -> Err end. @@ -399,12 +402,18 @@ purge(Node) -> Err end. --spec stat(ra_server_id()) -> {ok, {non_neg_integer(), non_neg_integer()}} - | {error | timeout, term()}. +-spec stat(ra_server_id()) -> + {ok, non_neg_integer(), non_neg_integer()} + | {error | timeout, term()}. stat(Leader) -> - Query = fun (State) -> rabbit_fifo:query_stat(State) end, - {ok, {_, Stat}, _} = ra:local_query(Leader, Query), - Stat. + %% short timeout as we don't want to spend too long if it is going to + %% fail anyway + case ra:local_query(Leader, fun rabbit_fifo:query_stat/1, 250) of + {ok, {_, {R, C}}, _} -> + {ok, R, C}; + Err -> + Err + end. %% @doc returns the cluster name -spec cluster_name(state()) -> cluster_name(). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 1f3aa1f38b..cb625e9036 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -310,11 +310,11 @@ stop(VHost) -> rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. delete(#amqqueue{type = quorum, pid = {Name, _}, - name = QName, quorum_nodes = QNodes}, + name = QName, quorum_nodes = QNodes} = Q, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = ?DELETE_TIMEOUT, - Msgs = quorum_messages(Name), + {ok, ReadyMsgs, _} = stat(Q), Servers = [{Name, Node} || Node <- QNodes], case ra:delete_cluster(Servers, Timeout) of {ok, {_, LeaderNode} = Leader} -> @@ -328,7 +328,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, ok = delete_queue_data(QName, ActingUser), rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], ?TICK_TIME), - {ok, Msgs}; + {ok, ReadyMsgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; (_) -> false @@ -337,7 +337,7 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, %% If all ra nodes were already down, the delete %% has succeed delete_queue_data(QName, ActingUser), - {ok, Msgs}; + {ok, ReadyMsgs}; false -> %% attempt forced deletion of all servers rabbit_log:warning( @@ -347,14 +347,14 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, [rabbit_misc:rs(QName), Errs]), ok = force_delete_queue(Servers), delete_queue_data(QName, ActingUser), - {ok, Msgs} + {ok, ReadyMsgs} end end. force_delete_queue(Servers) -> [begin - case catch(ra:delete_server(S)) of + case catch(ra:force_delete_server(S)) of ok -> ok; Err -> rabbit_log:warning( @@ -390,9 +390,9 @@ credit(CTag, Credit, Drain, QState) -> -spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(), rabbit_fifo_client:state()) -> - {'ok', 'empty', rabbit_fifo_client:state()} | - {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. -basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, + {'ok', 'empty', rabbit_fifo_client:state()} | + {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. +basic_get(#amqqueue{name = QName, pid = Id, type = quorum}, NoAck, CTag0, QState0) -> CTag = quorum_ctag(CTag0), Settlement = case NoAck of @@ -404,11 +404,11 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of {ok, empty, QState} -> {ok, empty, QState}; - {ok, {MsgId, {MsgHeader, Msg0}}, QState} -> + {ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} -> Count = maps:get(delivery_count, MsgHeader, 0), IsDelivered = Count > 0, Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), - {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; + {ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} end. @@ -490,8 +490,12 @@ info(Q, Items) -> stat(#amqqueue{pid = Leader}) -> try - {Ready, Consumers} = rabbit_fifo_client:stat(Leader), - {ok, Ready, Consumers} + case rabbit_fifo_client:stat(Leader) of + {ok, _, _} = Stat -> + Stat; + _ -> + {ok, 0, 0} + end catch _:_ -> %% Leader is not available, cluster might be in minority diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 6df61d4288..b8f6a110b3 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -145,7 +145,7 @@ return(Config) -> {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), {_, _, F2} = process_ra_events(F1, 100), - {ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), + {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), ra:stop_server(ServerId), @@ -237,9 +237,9 @@ resends_lost_command(Config) -> meck:unload(ra), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), {_, _, F4} = process_ra_events(F3, 500), - {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), ra:stop_server(ServerId), ok. @@ -299,7 +299,7 @@ returns_after_down(Config) -> receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, timer:sleep(1000), % message should be available for dequeue - {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), + {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), ra:stop_server(ServerId), ok. @@ -322,9 +322,9 @@ resends_after_lost_applied(Config) -> % send another message {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), {_, _, F4} = process_ra_events(F3, 500), - {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), ra:stop_server(ServerId), ok. @@ -395,7 +395,7 @@ cancel_checkout(Config) -> {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), ok. credit(Config) -> @@ -445,7 +445,7 @@ untracked_enqueue(Config) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), timer:sleep(100), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), + {ok, {{_, {_, msg1}}, _}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), ra:stop_server(ServerId), ok. @@ -504,10 +504,10 @@ dequeue(Config) -> {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), {_, _, F2} = process_ra_events(F2_, 100), - {ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), {_, _, F4} = process_ra_events(F4_, 100), - {ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), + {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), ra:stop_server(ServerId), ok. @@ -521,7 +521,7 @@ enq_deq_n(0, F0, Acc) -> enq_deq_n(N, F, Acc) -> {ok, F1} = rabbit_fifo_client:enqueue(N, F), {_, _, F2} = process_ra_events(F1, 10), - {ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), + {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), {_, _, F4} = process_ra_events(F3, 5), enq_deq_n(N-1, F4, [Deq | Acc]). |
