diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-02-21 16:53:28 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-22 14:14:31 +0000 |
| commit | e66284166322fce2a22218ecd5941d15f589a330 (patch) | |
| tree | b3f815d8016fdde43cbf37295faa36c07797d181 | |
| parent | 6980462e6d373e567485668420b25356fc884e1d (diff) | |
| download | rabbitmq-server-git-e66284166322fce2a22218ecd5941d15f589a330.tar.gz | |
Remove poison message from indexes
[#163513253]
| -rw-r--r-- | src/rabbit_fifo.erl | 43 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 40 |
2 files changed, 45 insertions, 38 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index da9772516c..4ed2bb743a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -472,7 +472,7 @@ apply(_, {down, ConsumerPid, noconnection}, #consumer{checked_out = Checked0} = C, {Co, St0, Eff}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled)-> - {St, Eff0} = return_all(St0, Checked0, Eff), + {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), @@ -965,7 +965,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 C0, SQ0, Effects0), {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0), + {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer), {S1#state{consumers = Cons1}, Effects1} end. @@ -1089,9 +1089,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0); + return_one(0, Msg, S0, E0, ConsumerId, Con); ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0) + return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, @@ -1201,19 +1201,21 @@ find_next_cursor(Smallest, Cursors0, Potential) -> end. return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0, Effects) -> + #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) -> {add_bytes_return(Msg, State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, Effects0) -> + delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), - {add_bytes_settle(RawMsg, State0), Effects}; + Checked = maps:without([MsgNum], Con#consumer.checked_out), + {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), + {add_bytes_settle(RawMsg, State1), Effects1}; _ -> Msg = {RaftId, {Header, RawMsg}}, %% this should not affect the release cursor in any way @@ -1221,14 +1223,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(State0, Checked0, Effects0) -> +return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E); + return_one(0, Msg, S, E, ConsumerId, Consumer); ({_, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgNum, Msg, S, E) + return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) end, {State0, Effects0}, Checked). %% checkout new messages to consumers @@ -1868,6 +1870,26 @@ return_checked_out_test() -> apply(meta(3), make_return(Cid, [MsgId]), State1), ok. +return_checked_out_limit_test() -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), make_return(Cid, [MsgId]), State1), + {#state{ra_indexes = RaIdxs}, ok, []} = + apply(meta(4), make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + return_auto_checked_out_test() -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), @@ -1886,7 +1908,6 @@ return_auto_checked_out_test() -> Effects), ok. - cancelled_checkout_out_test() -> Cid = {<<"cid">>, self()}, {State00, [_, _]} = enq(1, 1, first, test_init(test)), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index bd54d59869..6439a85ba4 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1523,7 +1523,7 @@ subscribe_redelivery_count(Config) -> end. subscribe_redelivery_limit(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1531,10 +1531,8 @@ subscribe_redelivery_limit(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-delivery-limit">>, long, 1}])), - RaName = ra_name(QQ), publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), subscribe(Ch, QQ, false), DTag = <<"x-delivery-count">>, @@ -1548,8 +1546,7 @@ subscribe_redelivery_limit(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, @@ -1560,8 +1557,7 @@ subscribe_redelivery_limit(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), receive {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> throw(unexpected_redelivery) @@ -1570,7 +1566,7 @@ subscribe_redelivery_limit(Config) -> end. subscribe_redelivery_policy(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1581,10 +1577,8 @@ subscribe_redelivery_policy(Config) -> Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>, [{<<"delivery-limit">>, 1}]), - RaName = ra_name(QQ), publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), subscribe(Ch, QQ, false), DTag = <<"x-delivery-count">>, @@ -1598,8 +1592,7 @@ subscribe_redelivery_policy(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, @@ -1610,8 +1603,7 @@ subscribe_redelivery_policy(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), receive {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> throw(unexpected_redelivery) @@ -1621,7 +1613,7 @@ subscribe_redelivery_policy(Config) -> ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>). subscribe_redelivery_limit_with_dead_letter(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1635,11 +1627,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) -> ?assertEqual({'queue.declare_ok', DLX, 0, 0}, declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - RaDlxName = ra_name(DLX), publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), subscribe(Ch, QQ, false), DTag = <<"x-delivery-count">>, @@ -1653,8 +1642,7 @@ subscribe_redelivery_limit_with_dead_letter(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), receive {#'basic.deliver'{delivery_tag = DeliveryTag1, redelivered = true}, @@ -1665,10 +1653,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) -> requeue = true}) end, - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages_ready(Servers, RaDlxName, 1), - wait_for_messages_pending_ack(Servers, RaDlxName, 0). + wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[DLX, <<"1">>, <<"1">>, <<"0">>]]). consume_redelivery_count(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
