summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-11-15 14:52:46 +0000
committerGitHub <noreply@github.com>2017-11-15 14:52:46 +0000
commit0d576e23bc83d8c8047ed7d90737fc5e720bf6e3 (patch)
tree61f216c5a7426a765bc8ab5e76480d53fe7e031a /src
parentdf25c1301da8d3d25503f5bdfa15fad6b1433eb8 (diff)
parent635c518f0c4a94f46ebd309da5baf0b6f78b00d1 (diff)
downloadrabbitmq-server-git-0d576e23bc83d8c8047ed7d90737fc5e720bf6e3.tar.gz
Merge pull request #1374 from rabbitmq/rabbitmq-server-995v3.7.0-rc.2
An option to discard messages if queue is full.
Diffstat (limited to 'src')
-rw-r--r--src/dtree.erl23
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl75
-rw-r--r--src/rabbit_channel.erl44
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_policies.erl9
6 files changed, 143 insertions, 19 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 466ec88f33..e8b3481b36 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
+-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -50,6 +50,7 @@
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec drop(pk(), ?MODULE()) -> ?MODULE().
-spec is_defined(sk(), ?MODULE()) -> boolean().
@@ -107,6 +108,26 @@ take(SK, {P, S}) ->
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+%% Drop an entry with the primary key and clears secondary keys for this key,
+%% returning a list with a key-value pair as a result.
+%% If the primary key does not exist, returns an empty list.
+take_one(PK, {P, S}) ->
+ case gb_trees:lookup(PK, P) of
+ {value, {SKS, Value}} ->
+ P1 = gb_trees:delete(PK, P),
+ S1 = gb_sets:fold(
+ fun(SK, Acc) ->
+ {value, PKS} = gb_trees:lookup(SK, Acc),
+ PKS1 = gb_sets:delete(PK, PKS),
+ case gb_sets:is_empty(PKS1) of
+ true -> gb_trees:delete(SK, Acc);
+ false -> gb_trees:update(SK, PKS1, Acc)
+ end
+ end, S, SKS),
+ {[{PK, Value}], {P1, S1}};
+ none -> {[], {P, S}}
+ end.
+
%% Drop all entries which contain the given secondary key, returning
%% the primary-key/value pairs of these entries. It is ok for the
%% given secondary key to not exist.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 378f3cbb76..ce73460603 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -576,6 +576,7 @@ declare_args() ->
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
{<<"x-max-priority">>, fun check_non_neg_int_arg/2},
+ {<<"x-overflow">>, fun check_overflow/2},
{<<"x-queue-mode">>, fun check_queue_mode/2}].
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
@@ -623,6 +624,14 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+check_overflow({longstr, Val}, _Args) ->
+ case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
+ true -> ok;
+ false -> {error, invalid_overflow}
+ end;
+check_overflow({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
true -> ok;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 84ba0174ab..a3c8f99519 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -80,6 +80,9 @@
max_length,
%% max length in bytes, if configured
max_bytes,
+ %% an action to perform if queue is to be over a limit,
+ %% can be either drop-head (default) or reject-publish
+ overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
%% e.g. message expiration messages from previously set up timers
@@ -159,7 +162,8 @@ init_state(Q) ->
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
- args_policy_version = 0},
+ args_policy_version = 0,
+ overflow = 'drop-head'},
rabbit_event:init_stats_timer(State, #q.stats_timer).
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
@@ -260,7 +264,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
msg_id_to_channel = MTC},
State2 = process_args_policy(State1),
State3 = lists:foldl(fun (Delivery, StateN) ->
- deliver_or_enqueue(Delivery, true, StateN)
+ maybe_deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
notify_decorators(startup, State3),
State3.
@@ -378,6 +382,7 @@ process_args_policy(State = #q{q = Q,
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
+ {<<"overflow">>, fun res_arg/2, fun init_overflow/2},
{<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
@@ -421,6 +426,18 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
+init_overflow(undefined, State) ->
+ State;
+init_overflow(Overflow, State) ->
+ OverflowVal = binary_to_existing_atom(Overflow, utf8),
+ case OverflowVal of
+ 'drop-head' ->
+ {_Dropped, State1} = maybe_drop_head(State#q{overflow = OverflowVal}),
+ State1;
+ _ ->
+ State#q{overflow = OverflowVal}
+ end.
+
init_queue_mode(undefined, State) ->
State;
init_queue_mode(Mode, State = #q {backing_queue = BQ,
@@ -621,12 +638,22 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
State#q{consumers = Consumers})}
end.
+maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) ->
+ send_mandatory(Delivery), %% must do this before confirms
+ case {will_overflow(Delivery, State), Overflow} of
+ {true, 'reject-publish'} ->
+ %% Drop publish and nack to publisher
+ send_reject_publish(Delivery, Delivered, State);
+ _ ->
+ %% Enqueue and maybe drop head later
+ deliver_or_enqueue(Delivery, Delivered, State)
+ end.
+
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
@@ -644,6 +671,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
+
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
@@ -665,7 +693,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_drop_head(State = #q{max_length = undefined,
max_bytes = undefined}) ->
{false, State};
-maybe_drop_head(State) ->
+maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
+ {false, State};
+maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
maybe_drop_head(false, State).
maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
@@ -684,6 +714,35 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
{AlreadyDropped, State}
end.
+send_reject_publish(#delivery{confirm = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo} = Delivery,
+ _Delivered,
+ State = #q{ backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
+ {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC),
+ gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}),
+ State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
+send_reject_publish(#delivery{confirm = false},
+ _Delivered, State) ->
+ State.
+
+will_overflow(_, #q{max_length = undefined,
+ max_bytes = undefined}) -> false;
+will_overflow(#delivery{message = Message},
+ #q{max_length = MaxLen,
+ max_bytes = MaxBytes,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ ExpectedQueueLength = BQ:len(BQS) + 1,
+
+ #basic_message{content = #content{payload_fragments_rev = PFR}} = Message,
+ MessageSize = iolist_size(PFR),
+ ExpectedQueueSizeBytes = BQ:info(message_bytes_ready, BQS) + MessageSize,
+
+ ExpectedQueueLength > MaxLen orelse ExpectedQueueSizeBytes > MaxBytes.
+
over_max_length(#q{max_length = MaxLen,
max_bytes = MaxBytes,
backing_queue = BQ,
@@ -1254,8 +1313,10 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- flow = Flow}, SlaveWhenPublished},
+handle_cast({deliver,
+ Delivery = #delivery{sender = Sender,
+ flow = Flow},
+ SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
%% In both credit_flow:ack/1 we are acking messages to the channel
@@ -1270,7 +1331,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
+ noreply(maybe_deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
%% [0] The second ack is since the channel thought we were a slave at
%% the time it published this message, so it used two credits (see
%% rabbit_amqqueue:deliver/2).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 563415c459..c671438ce8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -136,6 +136,9 @@
%% a list of tags for published messages that were
%% delivered but are yet to be confirmed to the client
confirmed,
+ %% a list of tags for published messages that were
+ %% rejected but are yet to be sent to the client
+ rejected,
%% a dtree used to track oustanding notifications
%% for messages published as mandatory
mandatory,
@@ -419,6 +422,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = dtree:empty(),
+ rejected = [],
confirmed = [],
mandatory = dtree:empty(),
capabilities = Capabilities,
@@ -449,6 +453,7 @@ prioritise_call(Msg, _From, _Len, _State) ->
prioritise_cast(Msg, _Len, _State) ->
case Msg of
{confirm, _MsgSeqNos, _QPid} -> 5;
+ {reject_publish, _MsgSeqNos, _QPid} -> 5;
{mandatory_received, _MsgSeqNo, _QPid} -> 5;
_ -> 0
end.
@@ -598,6 +603,13 @@ handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
%% NB: don't call noreply/1 since we don't want to send confirms.
noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
+handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
+ %% It does not matter which queue rejected the message,
+ %% if any queue rejected it - it should not be confirmed.
+ {MXs, UC1} = dtree:take_one(MsgSeqNo, UC),
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
+
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
@@ -621,7 +633,7 @@ handle_info(emit_stats, State) ->
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
%% NB: don't call noreply/1 since we don't want to kick off the
%% stats timer.
- {noreply, send_confirms(State1), hibernate};
+ {noreply, send_confirms_and_nacks(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -681,10 +693,10 @@ reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-next_state(State) -> ensure_stats_timer(send_confirms(State)).
+next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
-noreply_coalesce(State = #ch{confirmed = C}) ->
- Timeout = case C of [] -> hibernate; _ -> 0 end,
+noreply_coalesce(State = #ch{confirmed = C, rejected = R}) ->
+ Timeout = case {C, R} of {[], []} -> hibernate; _ -> 0 end,
{noreply, ensure_stats_timer(State), Timeout}.
ensure_stats_timer(State) ->
@@ -818,7 +830,7 @@ check_topic_authorisation(#exchange{name = Name = #resource{virtual_host = VHost
RoutingKey,
Permission) ->
Resource = Name#resource{kind = topic},
- Timeout = get_operation_timeout(),
+ Timeout = get_operation_timeout(),
AmqpParams = case ConnPid of
none ->
%% Called from outside the channel by mgmt API
@@ -962,6 +974,15 @@ maybe_set_fast_reply_to(
maybe_set_fast_reply_to(C, _State) ->
C.
+record_rejects([], State) ->
+ State;
+record_rejects(MXs, State = #ch{rejected = R, tx = Tx}) ->
+ Tx1 = case Tx of
+ none -> none;
+ _ -> failed
+ end,
+ State#ch{rejected = [MXs | R], tx = Tx1}.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -1874,21 +1895,24 @@ send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx = none, confirmed = []}) ->
+send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
-send_confirms(State = #ch{tx = none, confirmed = C}) ->
+send_confirms_and_nacks(State = #ch{tx = none, confirmed = C, rejected = R}) ->
case rabbit_node_monitor:pause_partition_guard() of
- ok -> MsgSeqNos =
+ ok -> ConfirmMsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
?INCR_STATS(exchange_stats, XName, 1,
confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
- send_confirms(MsgSeqNos, State#ch{confirmed = []});
+ State1 = send_confirms(ConfirmMsgSeqNos, State#ch{confirmed = []}),
+ %% TODO: msg seq nos, same as for confirms. Need to implement
+ %% nack rates first.
+ send_nacks(lists:append(R), State1#ch{rejected = []});
pausing -> State
end;
-send_confirms(State) ->
+send_confirms_and_nacks(State) ->
case rabbit_node_monitor:pause_partition_guard() of
ok -> maybe_complete_tx(State);
pausing -> State
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b70b183044..65a13f03c0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -302,6 +302,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
%% We are acking messages to the channel process that sent us
%% the message delivery. See
%% rabbit_amqqueue_process:handle_ch_down for more info.
+ %% If message is rejected by the master, the publish will be nacked
+ %% even if slaves confirm it. No need to check for length here.
maybe_flow_ack(Sender, Flow),
noreply(maybe_enqueue_message(Delivery, State));
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index be7d3dcd76..f48189b210 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -42,6 +42,7 @@ register() ->
{policy_validator, <<"max-length">>},
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
+ {policy_validator, <<"overflow">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -104,7 +105,13 @@ validate_policy0(<<"queue-mode">>, <<"default">>) ->
validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
ok;
validate_policy0(<<"queue-mode">>, Value) ->
- {error, "~p is not a valid queue-mode value", [Value]}.
+ {error, "~p is not a valid queue-mode value", [Value]};
+validate_policy0(<<"overflow">>, <<"drop-head">>) ->
+ ok;
+validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
+ ok;
+validate_policy0(<<"overflow">>, Value) ->
+ {error, "~p is not a valid overflow value", [Value]}.
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);