diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 51 |
3 files changed, 29 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 40b74c04ac..89913ba408 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1476,7 +1476,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow, lists:foldl( fun({{Name, _} = Pid, QName}, QStates) -> QState0 = get_quorum_state(Pid, QName, QStates), - case rabbit_quorum_queue:deliver(Confirm, Delivery, QState0) of + case rabbit_quorum_queue:deliver(Confirm, Delivery, + QState0) of {ok, QState} -> maps:put(Name, QState, QStates); {slow, QState} -> diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 3fceb93654..79d4a3effc 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -392,8 +392,7 @@ apply(_, {down, ConsumerPid, noconnection}, apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> - % remove any enqueuer for the same pid - % TODO: if there are any pending enqueuers these should be enqueued + % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of {#enqueuer{pending = Pend}, Enqs} -> diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 311894dee6..8d7d13aa98 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -104,8 +104,8 @@ init_state({Name, _}, QName) -> fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). -handle_event({ra_event, From, Evt}, FState) -> - rabbit_fifo_client:handle_ra_event(From, Evt, FState). +handle_event({ra_event, From, Evt}, QState) -> + rabbit_fifo_client:handle_ra_event(From, Evt, QState). declare(#amqqueue{name = QName, durable = Durable, @@ -160,9 +160,10 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> % QName = queue_name(Name), case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); - false -> rabbit_misc:rpc_call(Node, rabbit_quorum_queue, - cancel_consumer, - [QName, ChPid, ConsumerTag]) + false -> + rpc:cast(Node, rabbit_quorum_queue, + cancel_consumer, + [QName, ChPid, ConsumerTag]) end. cancel_consumer(QName, ChPid, ConsumerTag) -> @@ -306,19 +307,19 @@ delete_immediately({Name, _} = QPid) -> rabbit_core_metrics:queue_deleted(QName), ok. -ack(CTag, MsgIds, FState) -> - rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, FState). +ack(CTag, MsgIds, QState) -> + rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState). -reject(true, CTag, MsgIds, FState) -> - rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, FState); -reject(false, CTag, MsgIds, FState) -> - rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, FState). +reject(true, CTag, MsgIds, QState) -> + rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState); +reject(false, CTag, MsgIds, QState) -> + rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState). credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, - CTag0, FState0) -> + CTag0, QState0) -> CTag = quorum_ctag(CTag0), Settlement = case NoAck of true -> @@ -326,12 +327,12 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, false -> unsettled end, - case rabbit_fifo_client:dequeue(CTag, Settlement, FState0) of - {ok, empty, FState} -> - {ok, empty, FState}; - {ok, {MsgId, {MsgHeader, Msg}}, FState} -> + case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of + {ok, empty, QState} -> + {ok, empty, QState}; + {ok, {MsgId, {MsgHeader, Msg}}, QState} -> IsDelivered = maps:is_key(delivery_count, MsgHeader), - {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, FState}; + {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} end. @@ -352,19 +353,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, ConsumerPrefetchCount, Args), {ok, QState}. -basic_cancel(ConsumerTag, ChPid, OkMsg, FState0) -> +basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> maybe_send_reply(ChPid, OkMsg), - rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), FState0). + rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0). stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], Delivery#delivery.message). -deliver(false, Delivery, FState0) -> - rabbit_fifo_client:enqueue(Delivery#delivery.message, FState0); -deliver(true, Delivery, FState0) -> +deliver(false, Delivery, QState0) -> + rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); +deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, - Delivery#delivery.message, FState0). + Delivery#delivery.message, QState0). info(Q) -> info(Q, [name, durable, auto_delete, arguments, pid, state, messages, @@ -387,8 +388,8 @@ stat(_Q) -> purge(Node) -> rabbit_fifo_client:purge(Node). -requeue(ConsumerTag, MsgIds, FState) -> - rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState). +requeue(ConsumerTag, MsgIds, QState) -> + rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState). cleanup_data_dir() -> Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes} |
