summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-09-21 16:25:13 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-25 10:05:24 +0100
commit4a916b0057e8daf319fea4e020171aaadd4333d1 (patch)
tree547ee0a9781ed42e3c475918aa03ced0ef7cc445 /src
parent8468954a87b9287a64d2936afed3a36b521462c4 (diff)
downloadrabbitmq-server-git-4a916b0057e8daf319fea4e020171aaadd4333d1.tar.gz
Quorum queue consumer prioritiesqq-consumer-priorities
This switches the service queue inside rabbit_fifo from a normal queue to a priority queue such that consumers with a higher priority are favoured for service.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl65
-rw-r--r--src/rabbit_fifo.hrl5
2 files changed, 45 insertions, 25 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 79b2bb4f72..d56f21a477 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -305,7 +305,7 @@ apply(#{index := Index,
{State0, {dequeue, empty}};
Ready ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch},
+ {once, 1, simple_prefetch}, 0,
State0),
{success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
{State4, Effects1} = case Settlement of
@@ -345,7 +345,8 @@ apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ Priority = get_priority_from_args(ConsumerMeta),
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0),
checkout(Meta, State0, State1, [{monitor, process, Pid}]);
apply(#{index := Index}, #purge{},
#?MODULE{ra_indexes = Indexes0,
@@ -524,6 +525,14 @@ convert_v0_to_v1(V0State0) ->
pending = element(3, E),
status = element(4, E)}
end, V0Enqs),
+ V0Cons = rabbit_fifo_v0:get_field(consumers, V0State),
+ V1Cons = maps:map(
+ fun (_CId, C0) ->
+ %% add the priority field
+ list_to_tuple(tuple_to_list(C0) ++ [0])
+ end, V0Cons),
+ V0SQ = rabbit_fifo_v0:get_field(service_queue, V0State),
+ V1SQ = priority_queue:from_list(queue:to_list(V0SQ)),
Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
@@ -547,8 +556,8 @@ convert_v0_to_v1(V0State0) ->
enqueuers = V1Enqs,
ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
- consumers = rabbit_fifo_v0:get_field(consumers, V0State),
- service_queue = rabbit_fifo_v0:get_field(service_queue, V0State),
+ consumers = V1Cons,
+ service_queue = V1SQ,
prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
@@ -1670,13 +1679,12 @@ reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
end}.
checkout_one(Meta, #?MODULE{service_queue = SQ0,
- messages = Messages0,
- consumers = Cons0} = InitState) ->
- case queue:peek(SQ0) of
- {value, ConsumerId} ->
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
+ case priority_queue:out(SQ0) of
+ {{value, ConsumerId}, SQ1} ->
case take_next_msg(InitState) of
{ConsumerMsg, State0} ->
- SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
case maps:find(ConsumerId, Cons0) of
@@ -1727,7 +1735,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
empty ->
{nochange, InitState}
end;
- empty ->
+ {empty, _} ->
case lqueue:len(Messages0) of
0 -> {nochange, InitState};
_ -> {inactive, InitState}
@@ -1742,7 +1750,7 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
#?MODULE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
- service_queue = uniq_queue_in(ConsumerId, ServiceQueue)};
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)};
update_or_remove_sub(#{system_time := Ts},
ConsumerId, #consumer{lifetime = once,
checked_out = Checked,
@@ -1761,43 +1769,45 @@ update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
#?MODULE{consumers = Cons,
service_queue = ServiceQueue} = State) ->
State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
- service_queue = uniq_queue_in(ConsumerId, ServiceQueue)}.
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}.
-uniq_queue_in(Key, Queue) ->
+uniq_queue_in(Key, #consumer{priority = P}, Queue) ->
% TODO: queue:member could surely be quite expensive, however the practical
% number of unique consumers may not be large enough for it to matter
- case queue:member(Key, Queue) of
+ case priority_queue:member(Key, Queue) of
true ->
Queue;
false ->
- queue:in(Key, Queue)
+ priority_queue:in(Key, P, Queue)
end.
-update_consumer(ConsumerId, Meta, Spec,
+update_consumer(ConsumerId, Meta, Spec, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
- update_consumer0(ConsumerId, Meta, Spec, State0);
-update_consumer(ConsumerId, Meta, Spec,
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, Spec, Priority,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
- update_consumer0(ConsumerId, Meta, Spec, State0);
-update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
credit = Credit, credit_mode = Mode},
WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
State0#?MODULE{waiting_consumers = WaitingConsumers1}.
-update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
#?MODULE{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
Init = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
fun(S) ->
@@ -1811,12 +1821,12 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
ServiceQueue0),
State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
-maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
+maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
ServiceQueue0) ->
case Credit > 0 of
true ->
% consumerect needs service - check if already on service queue
- uniq_queue_in(ConsumerId, ServiceQueue0);
+ uniq_queue_in(ConsumerId, Con, ServiceQueue0);
false ->
ServiceQueue0
end.
@@ -2101,3 +2111,12 @@ is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
is_expired(_Ts, _State) ->
false.
+
+get_priority_from_args(#{args := Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_Key, Value} ->
+ Value;
+ _ -> 0
+ end;
+get_priority_from_args(_) ->
+ 0.
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 4bd88fa705..a44a2bc04d 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -94,7 +94,8 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- status = up :: up | suspected_down | cancelled
+ status = up :: up | suspected_down | cancelled,
+ priority = 0 :: non_neg_integer()
}).
-type consumer() :: #consumer{}.
@@ -169,7 +170,7 @@
consumers = #{} :: #{consumer_id() => #consumer{}},
% consumers that require further service are queued here
% needs to be part of snapshot
- service_queue = queue:new() :: queue:queue(consumer_id()),
+ service_queue = priority_queue:new() :: priority_queue:queue(consumer_id()),
%% This is a special field that is only used for snapshots
%% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.