summaryrefslogtreecommitdiff
path: root/src/dtree.erl
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-09-12 13:34:16 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-10-02 14:34:47 +0100
commitbf531fd017cbec756ee979299723adce76828c96 (patch)
treee81ffa4d3656f15826f95983ab2484ed494797b5 /src/dtree.erl
parent7e64d485e196c1791df6eff07940a6c5f368a7a0 (diff)
downloadrabbitmq-server-git-bf531fd017cbec756ee979299723adce76828c96.tar.gz
Add configurable queue overflow strategy
If a queue is to be overflowed by a delivery it can reject the delivery or drop messages from the head. To reject delivery overflow can be configured to `reject_publish`, to drop head it's `drop_head` (default setting). Messages which will be rejected should still confirm being routed, so mandatory expectations are not accumulated on the channel side. Slave nodes will only confirm if a message was published or discarded. To drop confirms from slaves, all confirms for a message are cleared when the message is rejected. When promoting a new master, left-behind deliveries should be rejected if the queue is full, just like normal deliveries. Fixes #995 [#151294447]
Diffstat (limited to 'src/dtree.erl')
-rw-r--r--src/dtree.erl23
1 files changed, 22 insertions, 1 deletions
diff --git a/src/dtree.erl b/src/dtree.erl
index 466ec88f33..e8b3481b36 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -32,7 +32,7 @@
-module(dtree).
--export([empty/0, insert/4, take/3, take/2, take_all/2, drop/2,
+-export([empty/0, insert/4, take/3, take/2, take_one/2, take_all/2, drop/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -50,6 +50,7 @@
-spec insert(pk(), [sk()], val(), ?MODULE()) -> ?MODULE().
-spec take([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec take(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
+-spec take_one(pk(), ?MODULE()) -> {[{pk(), val()}], ?MODULE()}.
-spec take_all(sk(), ?MODULE()) -> {[kv()], ?MODULE()}.
-spec drop(pk(), ?MODULE()) -> ?MODULE().
-spec is_defined(sk(), ?MODULE()) -> boolean().
@@ -107,6 +108,26 @@ take(SK, {P, S}) ->
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+%% Drop an entry with the primary key and clears secondary keys for this key,
+%% returning a list with a key-value pair as a result.
+%% If the primary key does not exist, returns an empty list.
+take_one(PK, {P, S}) ->
+ case gb_trees:lookup(PK, P) of
+ {value, {SKS, Value}} ->
+ P1 = gb_trees:delete(PK, P),
+ S1 = gb_sets:fold(
+ fun(SK, Acc) ->
+ {value, PKS} = gb_trees:lookup(SK, Acc),
+ PKS1 = gb_sets:delete(PK, PKS),
+ case gb_sets:is_empty(PKS1) of
+ true -> gb_trees:delete(SK, Acc);
+ false -> gb_trees:update(SK, PKS1, Acc)
+ end
+ end, S, SKS),
+ {[{PK, Value}], {P1, S1}};
+ none -> {[], {P, S}}
+ end.
+
%% Drop all entries which contain the given secondary key, returning
%% the primary-key/value pairs of these entries. It is ok for the
%% given secondary key to not exist.