summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-06 10:12:25 +0000
committerkjnilsson <knilsson@pivotal.io>2018-11-20 10:13:37 +0000
commita0c9dd69fc3acffa62ecc0844a46f2be975c363d (patch)
tree1e21a8aeb69a0fedf27ffa3efbae908c02183d96
parent7c1b567ae4ccd7f39eff7c6ec16c3b83bc81889d (diff)
downloadrabbitmq-server-git-a0c9dd69fc3acffa62ecc0844a46f2be975c363d.tar.gz
Quorum queue: Run cancel_consumer handler async
If cancle_consumer handler needs to run on a remote node use rpc:cast instead of call to avoid potential blocking.
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--src/rabbit_quorum_queue.erl51
3 files changed, 29 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 40b74c04ac..89913ba408 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1476,7 +1476,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow,
lists:foldl(
fun({{Name, _} = Pid, QName}, QStates) ->
QState0 = get_quorum_state(Pid, QName, QStates),
- case rabbit_quorum_queue:deliver(Confirm, Delivery, QState0) of
+ case rabbit_quorum_queue:deliver(Confirm, Delivery,
+ QState0) of
{ok, QState} ->
maps:put(Name, QState, QStates);
{slow, QState} ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 3fceb93654..79d4a3effc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -392,8 +392,7 @@ apply(_, {down, ConsumerPid, noconnection},
apply(_, {down, Pid, _Info}, Effects0,
#state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
- % remove any enqueuer for the same pid
- % TODO: if there are any pending enqueuers these should be enqueued
+ % Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
{#enqueuer{pending = Pend}, Enqs} ->
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 311894dee6..8d7d13aa98 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -104,8 +104,8 @@ init_state({Name, _}, QName) ->
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
-handle_event({ra_event, From, Evt}, FState) ->
- rabbit_fifo_client:handle_ra_event(From, Evt, FState).
+handle_event({ra_event, From, Evt}, QState) ->
+ rabbit_fifo_client:handle_ra_event(From, Evt, QState).
declare(#amqqueue{name = QName,
durable = Durable,
@@ -160,9 +160,10 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
% QName = queue_name(Name),
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
- false -> rabbit_misc:rpc_call(Node, rabbit_quorum_queue,
- cancel_consumer,
- [QName, ChPid, ConsumerTag])
+ false ->
+ rpc:cast(Node, rabbit_quorum_queue,
+ cancel_consumer,
+ [QName, ChPid, ConsumerTag])
end.
cancel_consumer(QName, ChPid, ConsumerTag) ->
@@ -306,19 +307,19 @@ delete_immediately({Name, _} = QPid) ->
rabbit_core_metrics:queue_deleted(QName),
ok.
-ack(CTag, MsgIds, FState) ->
- rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, FState).
+ack(CTag, MsgIds, QState) ->
+ rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState).
-reject(true, CTag, MsgIds, FState) ->
- rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, FState);
-reject(false, CTag, MsgIds, FState) ->
- rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, FState).
+reject(true, CTag, MsgIds, QState) ->
+ rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState);
+reject(false, CTag, MsgIds, QState) ->
+ rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState).
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
- CTag0, FState0) ->
+ CTag0, QState0) ->
CTag = quorum_ctag(CTag0),
Settlement = case NoAck of
true ->
@@ -326,12 +327,12 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
false ->
unsettled
end,
- case rabbit_fifo_client:dequeue(CTag, Settlement, FState0) of
- {ok, empty, FState} ->
- {ok, empty, FState};
- {ok, {MsgId, {MsgHeader, Msg}}, FState} ->
+ case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
+ {ok, empty, QState} ->
+ {ok, empty, QState};
+ {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
- {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, FState};
+ {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
end.
@@ -352,19 +353,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
ConsumerPrefetchCount, Args),
{ok, QState}.
-basic_cancel(ConsumerTag, ChPid, OkMsg, FState0) ->
+basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
maybe_send_reply(ChPid, OkMsg),
- rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), FState0).
+ rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
Delivery#delivery.message).
-deliver(false, Delivery, FState0) ->
- rabbit_fifo_client:enqueue(Delivery#delivery.message, FState0);
-deliver(true, Delivery, FState0) ->
+deliver(false, Delivery, QState0) ->
+ rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
+deliver(true, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, FState0).
+ Delivery#delivery.message, QState0).
info(Q) ->
info(Q, [name, durable, auto_delete, arguments, pid, state, messages,
@@ -387,8 +388,8 @@ stat(_Q) ->
purge(Node) ->
rabbit_fifo_client:purge(Node).
-requeue(ConsumerTag, MsgIds, FState) ->
- rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
+requeue(ConsumerTag, MsgIds, QState) ->
+ rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
cleanup_data_dir() ->
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}