summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-09-12 13:34:16 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-10-02 14:34:47 +0100
commitbf531fd017cbec756ee979299723adce76828c96 (patch)
treee81ffa4d3656f15826f95983ab2484ed494797b5
parent7e64d485e196c1791df6eff07940a6c5f368a7a0 (diff)
downloadrabbitmq-server-git-bf531fd017cbec756ee979299723adce76828c96.tar.gz
Add configurable queue overflow strategy
If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447]
-rw-r--r--src/dtree.erl23
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl69
-rw-r--r--src/rabbit_channel.erl43
-rw-r--r--src/rabbit_mirror_queue_slave.erl1
-rw-r--r--src/rabbit_policies.erl9
-rw-r--r--test/clustering_management_SUITE.erl2
-rw-r--r--test/priority_queue_SUITE.erl15
-rw-r--r--test/rabbit_ha_test_producer.erl53
-rw-r--r--test/simple_ha_SUITE.erl39
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl185
11 files changed, 410 insertions, 38 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 466ec88f33..e8b3481b36 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
+-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -50,6 +50,7 @@
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec drop(pk(), ?MODULE()) -> ?MODULE().
-spec is_defined(sk(), ?MODULE()) -> boolean().
@@ -107,6 +108,26 @@ take(SK, {P, S}) ->
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+%% Drop an entry with the primary key and clears secondary keys for this key,
+%% returning a list with a key-value pair as a result.
+%% If the primary key does not exist, returns an empty list.
+take_one(PK, {P, S}) ->
+ case gb_trees:lookup(PK, P) of
+ {value, {SKS, Value}} ->
+ P1 = gb_trees:delete(PK, P),
+ S1 = gb_sets:fold(
+ fun(SK, Acc) ->
+ {value, PKS} = gb_trees:lookup(SK, Acc),
+ PKS1 = gb_sets:delete(PK, PKS),
+ case gb_sets:is_empty(PKS1) of
+ true -> gb_trees:delete(SK, Acc);
+ false -> gb_trees:update(SK, PKS1, Acc)
+ end
+ end, S, SKS),
+ {[{PK, Value}], {P1, S1}};
+ none -> {[], {P, S}}
+ end.
+
%% Drop all entries which contain the given secondary key, returning
%% the primary-key/value pairs of these entries. It is ok for the
%% given secondary key to not exist.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 378f3cbb76..592de77c97 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -576,6 +576,7 @@ declare_args() ->
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
+ {<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
@@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+check_overflow({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"drop_head">>, <<"reject_publish">>]) of
+ true -> ok;
+ false -> {error, invalid_overflow}
+ end;
+check_overflow({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
true -> ok;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b7a569e94..3cd9f4195d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -79,6 +79,9 @@
max_length,
%% max length in bytes, if configured
max_bytes,
+ %% an action to perform if queue is to be over a limit,
+ %% can be either drop_head (default) or reject_publish
+ overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
%% e.g. message expiration messages from previously set up timers
@@ -158,7 +161,8 @@ init_state(Q) ->
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
- args_policy_version = 0},
+ args_policy_version = 0,
+ overflow = drop_head},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -259,7 +263,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
msg_id_to_channel = MTC},
State2 = process_args_policy(State1),
State3 = lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
+ maybe_deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
notify_decorators(startup, State3),
State3.
@@ -377,6 +381,7 @@ process_args_policy(State = #q{q = Q,
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
+ {<<"overflow">>, fun res_arg/2, fun init_overflow/2},
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
@@ -420,6 +425,12 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
+init_overflow(undefined, State) ->
+ State;
+init_overflow(Overflow, State) ->
+ %% TODO maybe drop head
+ State#q{overflow = binary_to_existing_atom(Overflow, utf8)}.
+
init_queue_mode(undefined, State) ->
State;
init_queue_mode(Mode, State = #q {backing_queue = BQ,
@@ -620,12 +631,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
+maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+ send_mandatory(Delivery), %% must do this before confirms
+ case {will_overflow(Delivery, State), Overflow} of
+ {true, reject_publish} ->
+ %% Drop publish and nack to publisher
+ nack_publish_no_space(Delivery, Delivered, State);
+ _ ->
+ %% Enqueue and maybe drop head later
+ deliver_or_enqueue(Delivery, Delivered, State)
+ end.
+
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -643,6 +664,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
+
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
@@ -664,7 +686,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_drop_head(State = #q{max_length = undefined,
max_bytes = undefined}) ->
{false, State};
-maybe_drop_head(State) ->
+maybe_drop_head(State = #q{overflow = reject_publish}) ->
+ {false, State};
+maybe_drop_head(State = #q{overflow = drop_head}) ->
maybe_drop_head(false, State).
maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
@@ -683,6 +707,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
{AlreadyDropped, State}
end.
+nack_publish_no_space(#delivery{confirm = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo} = Delivery,
+ _Delivered,
+ State = #q{ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
+ State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
+nack_publish_no_space(#delivery{confirm = false},
+ _Delivered, State) ->
+ State.
+
+will_overflow(_, #q{max_length = undefined,
+ max_bytes = undefined}) -> false;
+will_overflow(#delivery{message = Message},
+ #q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ ExpectedQueueLength = BQ:len(BQS) + 1,
+
+ #basic_message{content = #content{payload_fragments_rev = PFR}} = Message,
+ MessageSize = iolist_size(PFR),
+ ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize,
+
+ ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes.
+
over_max_length(#q{max_length = MaxLen,
max_bytes = MaxBytes,
backing_queue = BQ,
@@ -1242,8 +1295,10 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- flow = Flow}, SlaveWhenPublished},
+handle_cast({deliver,
+ Delivery = #delivery{sender = Sender,
+ flow = Flow},
+ SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
%% In both credit_flow:ack/1 we are acking messages to the channel
@@ -1258,7 +1313,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
+ noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
%% [0] The second ack is since the channel thought we were a slave at
%% the time it published this message, so it used two credits (see
%% rabbit_amqqueue:deliver/2).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c69a27d57c..345f90d5e7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -136,6 +136,9 @@
%% a list of tags for published messages that were
%% delivered but are yet to be confirmed to the client
confirmed,
+ %% a list of tags for published messages that were
+ %% rejected but are yet to be sent to the client
+ rejected,
%% a dtree used to track oustanding notifications
%% for messages published as mandatory
mandatory,
@@ -399,6 +402,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = dtree:empty(),
+ rejected = [],
confirmed = [],
mandatory = dtree:empty(),
capabilities = Capabilities,
@@ -429,6 +433,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
+ {reject_publish, _MsgSeqNos, _QPid} -> 5;
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
_ -> 0
end.
@@ -578,6 +583,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
+handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
+ %% It does not matter which queue rejected the message,
+ %% if any queue rejected it - it should not be confirmed.
+ {MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
+
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -601,7 +613,7 @@ handle_info(emit_stats, State) ->
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
%% NB: don't call noreply/1 since we don't want to kick off the
%% stats timer.
- {noreply, send_confirms(State1), hibernate};
+ {noreply, send_confirms_and_nacks(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -661,10 +673,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-next_state(State) -> ensure_stats_timer(send_confirms(State)).
+next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
-noreply_coalesce(State = #ch{confirmed = C}) ->
- Timeout = case C of [] -> hibernate; _ -> 0 end,
+noreply_coalesce(State = #ch{confirmed = C, rejected = R}) ->
+ Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end,
{noreply, ensure_stats_timer(State), Timeout}.
ensure_stats_timer(State) ->
@@ -798,7 +810,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
RoutingKey,
Permission) ->
Resource = Name#resource{kind = topic},
- Timeout = get_operation_timeout(),
+ Timeout = get_operation_timeout(),
AmqpParams = case ConnPid of
none ->
%% Called from outside the channel by mgmt API
@@ -942,6 +954,15 @@ maybe_set_fast_reply_to(
maybe_set_fast_reply_to(C, _State) ->
C.
+record_rejects([], State) ->
+ State;
+record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) ->
+ Tx1 = case Tx of
+ none -> none;
+ _ -> failed
+ end,
+ State#ch{rejected = [MXs | R], tx = Tx1}.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -1846,21 +1867,23 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx = none, confirmed = []}) ->
+send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
-send_confirms(State = #ch{tx = none, confirmed = C}) ->
+send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
- ok -> MsgSeqNos =
+ ok -> ConfirmMsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
?INCR_STATS([{exchange_stats, XName, 1}],
confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
- send_confirms(MsgSeqNos, State#ch{confirmed = []});
+ State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
+ %% TODO: msg seq nos, same as for confirms.
+ send_nacks(lists:append(R), State1#ch{rejected = []});
pausing -> State
end;
-send_confirms(State) ->
+send_confirms_and_nacks(State) ->
case rabbit_node_monitor:pause_partition_guard() of
ok -> maybe_complete_tx(State);
pausing -> State
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ee697be501..4a6e077f24 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -302,6 +302,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
%% We are acking messages to the channel process that sent us
%% the message delivery. See
%% rabbit_amqqueue_process:handle_ch_down for more info.
+ %% TODO: reject publishes
maybe_flow_ack(Sender, Flow),
noreply(maybe_enqueue_message(Delivery, State));
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index be7d3dcd76..3fadfc82e1 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -42,6 +42,7 @@ register() ->
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
+ {policy_validator, <<"overflow">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) ->
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
ok;
validate_policy0(<<"queue-mode">>, Value) ->
- {error, "~p is not a valid queue-mode value", [Value]}.
+ {error, "~p is not a valid queue-mode value", [Value]};
+validate_policy0(<<"overflow">>, <<"drop_head">>) ->
+ ok;
+validate_policy0(<<"overflow">>, <<"reject_publish">>) ->
+ ok;
+validate_policy0(<<"overflow">>, Value) ->
+ {error, "~p is not a valid overflow value", [Value]}.
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 2a23c4997e..8bf8a9a8b8 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -704,7 +704,7 @@ assert_failure(Fun) ->
{error_string, Reason} -> Reason;
{badrpc, {'EXIT', Reason}} -> Reason;
{badrpc_multi, Reason, _Nodes} -> Reason;
- Other -> exit({expected_failure, Other})
+ Other -> error({expected_failure, Other})
end.
stop_app(Node) ->
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index eecd59b879..eb781ffedf 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -32,6 +32,7 @@ groups() ->
{cluster_size_2, [], [
ackfold,
drop,
+ reject,
dropwhile_fetchwhile,
info_head_message_timestamp,
matching,
@@ -306,6 +307,20 @@ drop(Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
+reject(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Q = <<"reject-queue">>,
+ declare(Ch, Q, [{<<"x-max-length">>, long, 4},
+ {<<"x-overflow">>, longstr, <<"reject_publish">>}
+ | arguments(3)]),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ %% First 4 messages are published, all others are discarded.
+ get_all(Ch, Q, do_ack, [3, 2, 1, 1]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
purge(Config) ->
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
Q = <<"purge-queue">>,
diff --git a/test/rabbit_ha_test_producer.erl b/test/rabbit_ha_test_producer.erl
index fe2d15ed9a..b6c301cd0c 100644
--- a/test/rabbit_ha_test_producer.erl
+++ b/test/rabbit_ha_test_producer.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_ha_test_producer).
--export([await_response/1, start/5, create/5]).
+-export([await_response/1, start/6, create/5, create/6]).
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -28,13 +28,20 @@ await_response(ProducerPid) ->
end.
create(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+ create(Channel, Queue, TestPid, Confirm, MsgsToSend, acks).
+
+create(Channel, Queue, TestPid, Confirm, MsgsToSend, Mode) ->
+ AckNackMsgs = case Mode of
+ acks -> {ok, {error, received_nacks}};
+ nacks -> {{error, received_acks}, ok}
+ end,
ProducerPid = spawn_link(?MODULE, start, [Channel, Queue, TestPid,
- Confirm, MsgsToSend]),
+ Confirm, MsgsToSend, AckNackMsgs]),
receive
{ProducerPid, started} -> ProducerPid
end.
-start(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+start(Channel, Queue, TestPid, Confirm, MsgsToSend, AckNackMsgs) ->
ConfirmState =
case Confirm of
true -> amqp_channel:register_confirm_handler(Channel, self()),
@@ -45,25 +52,27 @@ start(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
end,
TestPid ! {self(), started},
error_logger:info_msg("publishing ~w msgs on ~p~n", [MsgsToSend, Channel]),
- producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend).
+ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs).
%%
%% Private API
%%
-producer(_Channel, _Queue, TestPid, none, 0) ->
+producer(_Channel, _Queue, TestPid, none, 0, _AckNackMsgs) ->
TestPid ! {self(), ok};
-producer(Channel, _Queue, TestPid, ConfirmState, 0) ->
+producer(Channel, _Queue, TestPid, ConfirmState, 0, {AckMsg, NackMsg}) ->
error_logger:info_msg("awaiting confirms on channel ~p~n", [Channel]),
- Msg = case drain_confirms(no_nacks, ConfirmState) of
- no_nacks -> ok;
- nacks -> {error, received_nacks};
+ Msg = case drain_confirms(none, ConfirmState) of
+ %% No acks or nacks
+ acks -> AckMsg;
+ nacks -> NackMsg;
+ mix -> {error, received_both_acks_and_nacks};
{Nacks, CS} -> {error, {missing_confirms, Nacks,
lists:sort(gb_trees:keys(CS))}}
end,
TestPid ! {self(), Msg};
-producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) ->
+producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend, AckNackMsgs) ->
Method = #'basic.publish'{exchange = <<"">>,
routing_key = Queue,
mandatory = false,
@@ -76,7 +85,7 @@ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) ->
payload = list_to_binary(
integer_to_list(MsgsToSend))}),
- producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1).
+ producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1, AckNackMsgs).
maybe_record_confirm(none, _, _) ->
none;
@@ -84,22 +93,34 @@ maybe_record_confirm(ConfirmState, Channel, MsgsToSend) ->
SeqNo = amqp_channel:next_publish_seqno(Channel),
gb_trees:insert(SeqNo, MsgsToSend, ConfirmState).
-drain_confirms(Nacks, ConfirmState) ->
+drain_confirms(Collected, ConfirmState) ->
case gb_trees:is_empty(ConfirmState) of
- true -> Nacks;
+ true -> Collected;
false -> receive
#'basic.ack'{delivery_tag = DeliveryTag,
multiple = IsMulti} ->
- drain_confirms(Nacks,
+ Collected1 = case Collected of
+ none -> acks;
+ acks -> acks;
+ nacks -> mix;
+ mix -> mix
+ end,
+ drain_confirms(Collected1,
delete_confirms(DeliveryTag, IsMulti,
ConfirmState));
#'basic.nack'{delivery_tag = DeliveryTag,
multiple = IsMulti} ->
- drain_confirms(nacks,
+ Collected1 = case Collected of
+ none -> nacks;
+ nacks -> nacks;
+ acks -> mix;
+ mix -> mix
+ end,
+ drain_confirms(Collected1,
delete_confirms(DeliveryTag, IsMulti,
ConfirmState))
after
- 60000 -> {Nacks, ConfirmState}
+ 60000 -> {Collected, ConfirmState}
end
end.
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
index a0499b9d59..297b53b713 100644
--- a/test/simple_ha_SUITE.erl
+++ b/test/simple_ha_SUITE.erl
@@ -44,7 +44,8 @@ groups() ->
auto_resume_no_ccn_client,
confirms_survive_stop,
confirms_survive_sigkill,
- confirms_survive_policy
+ confirms_survive_policy,
+ rejects_survive_stop
]}
].
@@ -156,6 +157,10 @@ confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2).
confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2).
confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2).
+rejects_survive_stop(Cf) -> rejecets_survive(Cf, fun stop/2).
+rejects_survive_sigkill(Cf) -> rejecets_survive(Cf, fun sigkill/2).
+rejects_survive_policy(Cf) -> rejecets_survive(Cf, fun policy/2).
+
%%----------------------------------------------------------------------------
consume_survives(Config, DeathFun, CancelOnFailover) ->
@@ -213,6 +218,38 @@ confirms_survive(Config, DeathFun) ->
rabbit_ha_test_producer:await_response(ProducerPid),
ok.
+rejecets_survive(Config, DeathFun) ->
+ [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000),
+ Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A),
+ Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B),
+
+ %% declare the queue on the master, mirrored to the two slaves
+ Queue = <<"test_rejects">>,
+ amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue,
+ auto_delete = false,
+ durable = true,
+ arguments = [{<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject_publish">>}]}),
+ Payload = <<"there can be only one">>,
+ amqp_channel:call(Node1Channel,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{payload = Payload}),
+
+ %% send a bunch of messages from the producer. Tolerating nacks.
+ ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue,
+ self(), true, Msgs, nacks),
+ DeathFun(Config, A),
+ rabbit_ha_test_producer:await_response(ProducerPid),
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}),
+ %% There is only one message.
+ #'basic.get_empty'{} = amqp_channel:call(Node2Channel, #'basic.get'{queue = Queue}),
+ ok.
+
+
+
stop(Config, Node) ->
rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50).
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index dd8cd48b5a..951cf6f213 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -33,6 +33,9 @@ all() ->
].
groups() ->
+ MaxLengthTests = [max_length_drop_head,
+ max_length_reject_confirm,
+ max_length_drop_publish],
[
{parallel_tests, [parallel], [
amqp_connection_refusal,
@@ -48,7 +51,10 @@ groups() ->
]},
set_disk_free_limit_command,
set_vm_memory_high_watermark_command,
- topic_matching
+ topic_matching,
+ {queue_max_length, [], [
+ {max_length_simple, [], MaxLengthTests},
+ {max_length_mirrored, [], MaxLengthTests}]}
]}
].
@@ -63,6 +69,11 @@ init_per_suite(Config) ->
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
+init_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
@@ -91,6 +102,18 @@ setup_file_handle_cache1() ->
ok = file_handle_cache:set_limit(10),
ok.
+end_per_group(max_length_mirrored, Config) ->
+ rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"^max_length.*queue">>),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{is_mirrored, false}]),
+ Config1;
+end_per_group(queue_max_length, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_reject_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_publish_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_drop_head_queue">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"max_length_default_drop_head_queue">>}),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
+ Config;
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
@@ -1018,6 +1041,161 @@ set_vm_memory_high_watermark_command1(_Config) ->
)
end.
+max_length_drop_head(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_drop_head_queue">>,
+ QNameDefault = <<"max_length_default_drop_head_queue">>,
+ QNameBytes = <<"max_length_bytes_drop_head_queue">>,
+ QNameDefaultBytes = <<"max_length_bytes_default_drop_head_queue">>,
+
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"drop_head">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefault}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameDefaultBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefault, arguments = MaxLengthArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameDefaultBytes, arguments = MaxLengthBytesArgs}),
+
+ check_max_length_drops_head(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_drops_head(Config, QNameDefault, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+ check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3),
+ check_max_length_drops_head(Config, QNameDefault, Ch, Payload1, Payload2, Payload3).
+
+max_length_reject_confirm(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_reject_queue">>,
+ QNameBytes = <<"max_length_bytes_reject_queue">>,
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+ check_max_length_rejects(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3),
+ check_max_length_rejects(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+max_length_drop_publish(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = <<"max_length_drop_publish_queue">>,
+ QNameBytes = <<"max_length_bytes_drop_publish_queue">>,
+ MaxLengthArgs = [{<<"x-max-length">>, long, 1}],
+ MaxLengthBytesArgs = [{<<"x-max-length-bytes">>, long, 100}],
+ OverflowArgs = [{<<"x-overflow">>, longstr, <<"reject_publish">>}],
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = QNameBytes}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = MaxLengthArgs ++ OverflowArgs}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QNameBytes, arguments = MaxLengthBytesArgs ++ OverflowArgs}),
+ %% If confirms are not enable, publishes will still be dropped in reject_publish mode.
+ check_max_length_drops_publish(Config, QName, Ch, <<"1">>, <<"2">>, <<"3">>),
+
+ %% 80 bytes payload
+ Payload1 = << <<"1">> || _ <- lists:seq(1, 80) >>,
+ Payload2 = << <<"2">> || _ <- lists:seq(1, 80) >>,
+ Payload3 = << <<"3">> || _ <- lists:seq(1, 80) >>,
+
+ check_max_length_drops_publish(Config, QNameBytes, Ch, Payload1, Payload2, Payload3).
+
+check_max_length_drops_publish(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 2 is dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 2 and 3 are dropped, message 1 stays
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_rejects(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ flush(),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% First message can be enqueued and acks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ %% The message cannot be enqueued and nacks
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ receive #'basic.nack'{} -> ok
+ after 1000 -> error(expected_nack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Now we can publish message 2.
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ receive #'basic.ack'{} -> ok
+ after 1000 -> error(expected_ack)
+ end,
+
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+check_max_length_drops_head(Config, QName, Ch, Payload1, Payload2, Payload3) ->
+ sync_mirrors(QName, Config),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ %% A single message is published and consumed
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload1}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Message 1 is replaced by message 2
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload2}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+
+ %% Messages 1 and 2 are replaced
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload1}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload2}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload3}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Payload3}} = amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+sync_mirrors(QName, Config) ->
+ case ?config(is_mirrored, Config) of
+ true ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
+ _ -> ok
+ end.
+
%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
@@ -1031,3 +1209,8 @@ expand_options(As, Bs) ->
false -> [A | R]
end
end, Bs, As).
+
+flush() ->
+ receive _ -> flush()
+ after 10 -> ok
+ end. \ No newline at end of file