summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-02-21 16:53:28 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-22 14:14:31 +0000
commite66284166322fce2a22218ecd5941d15f589a330 (patch)
treeb3f815d8016fdde43cbf37295faa36c07797d181
parent6980462e6d373e567485668420b25356fc884e1d (diff)
downloadrabbitmq-server-git-e66284166322fce2a22218ecd5941d15f589a330.tar.gz
Remove poison message from indexes
[#163513253]
-rw-r--r--src/rabbit_fifo.erl43
-rw-r--r--test/quorum_queue_SUITE.erl40
2 files changed, 45 insertions, 38 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index da9772516c..4ed2bb743a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -472,7 +472,7 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when (node(P) =:= Node) and
(C#consumer.status =/= cancelled)->
- {St, Eff0} = return_all(St0, Checked0, Eff),
+ {St, Eff0} = return_all(St0, Checked0, Eff, K, C),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
suspected_down, Eff0),
@@ -965,7 +965,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
C0, SQ0, Effects0),
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- {S1, Effects1} = return_all(S0, Checked0, Effects0),
+ {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, Consumer),
{S1#state{consumers = Cons1}, Effects1}
end.
@@ -1089,9 +1089,9 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
{State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0);
+ return_one(0, Msg, S0, E0, ConsumerId, Con);
({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0)
+ return_one(MsgNum, Msg, S0, E0, ConsumerId, Con)
end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
@@ -1201,19 +1201,21 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
end.
return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0, Effects) ->
+ #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) ->
{add_bytes_return(Msg,
State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0, Effects0) ->
+ delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
- {add_bytes_settle(RawMsg, State0), Effects};
+ Checked = maps:without([MsgNum], Con#consumer.checked_out),
+ {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0),
+ {add_bytes_settle(RawMsg, State1), Effects1};
_ ->
Msg = {RaftId, {Header, RawMsg}},
%% this should not affect the release cursor in any way
@@ -1221,14 +1223,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
end.
-return_all(State0, Checked0, Effects0) ->
+return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(0, Msg, S, E);
+ return_one(0, Msg, S, E, ConsumerId, Consumer);
({_, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgNum, Msg, S, E)
+ return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
end, {State0, Effects0}, Checked).
%% checkout new messages to consumers
@@ -1868,6 +1870,26 @@ return_checked_out_test() ->
apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
+return_checked_out_limit_test() ->
+ Cid = {<<"cid">>, self()},
+ Init = init(#{name => test,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(test, utf8)),
+ release_cursor_interval => 0,
+ delivery_limit => 1}),
+ {State0, [_, _]} = enq(1, 1, first, Init),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {#state{ra_indexes = RaIdxs}, ok, []} =
+ apply(meta(4), make_return(Cid, [MsgId2]), State2),
+ ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
+ ok.
+
return_auto_checked_out_test() ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -1886,7 +1908,6 @@ return_auto_checked_out_test() ->
Effects),
ok.
-
cancelled_checkout_out_test() ->
Cid = {<<"cid">>, self()},
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index bd54d59869..6439a85ba4 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1523,7 +1523,7 @@ subscribe_redelivery_count(Config) ->
end.
subscribe_redelivery_limit(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1531,10 +1531,8 @@ subscribe_redelivery_limit(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-delivery-limit">>, long, 1}])),
- RaName = ra_name(QQ),
publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QQ, false),
DTag = <<"x-delivery-count">>,
@@ -1548,8 +1546,7 @@ subscribe_redelivery_limit(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag1,
redelivered = true},
@@ -1560,8 +1557,7 @@ subscribe_redelivery_limit(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
receive
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
throw(unexpected_redelivery)
@@ -1570,7 +1566,7 @@ subscribe_redelivery_limit(Config) ->
end.
subscribe_redelivery_policy(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1581,10 +1577,8 @@ subscribe_redelivery_policy(Config) ->
Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
[{<<"delivery-limit">>, 1}]),
- RaName = ra_name(QQ),
publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QQ, false),
DTag = <<"x-delivery-count">>,
@@ -1598,8 +1592,7 @@ subscribe_redelivery_policy(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag1,
redelivered = true},
@@ -1610,8 +1603,7 @@ subscribe_redelivery_policy(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
receive
{#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
throw(unexpected_redelivery)
@@ -1621,7 +1613,7 @@ subscribe_redelivery_policy(Config) ->
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
subscribe_redelivery_limit_with_dead_letter(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1635,11 +1627,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
?assertEqual({'queue.declare_ok', DLX, 0, 0},
declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- RaName = ra_name(QQ),
- RaDlxName = ra_name(DLX),
publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QQ, false),
DTag = <<"x-delivery-count">>,
@@ -1653,8 +1642,7 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag1,
redelivered = true},
@@ -1665,10 +1653,8 @@ subscribe_redelivery_limit_with_dead_letter(Config) ->
requeue = true})
end,
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages_ready(Servers, RaDlxName, 1),
- wait_for_messages_pending_ack(Servers, RaDlxName, 0).
+ wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
+ wait_for_messages(Config, [[DLX, <<"1">>, <<"1">>, <<"0">>]]).
consume_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),