diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-10-07 11:43:48 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-10-07 11:43:48 +0100 |
| commit | fe56296805817bc6f6e81dd60ca3cf541e7c2c3d (patch) | |
| tree | 5261aa29d529bcdb7a7418e8cf7b27d24cfacd78 | |
| parent | 91d6a5d84e377d288d114999542f58bf82b5c1b7 (diff) | |
| download | rabbitmq-server-git-fe56296805817bc6f6e81dd60ca3cf541e7c2c3d.tar.gz | |
Move rabbit_misc:confirm_to_sender/3
out of rabbit_common as it is only used by modules in the server.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 15 |
3 files changed, 30 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0aca86c4af..ae4ad6463d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -806,7 +806,8 @@ send_reject_publish(#delivery{confirm = true, backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> - ok = send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo), + ok = rabbit_classic_queue:send_rejection(SenderPid, + amqqueue:get_name(Q), MsgSeqNo), MTC1 = maps:remove(MsgId, MTC), BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), @@ -1842,21 +1843,6 @@ update_ha_mode(State) -> State. confirm_to_sender(Pid, QName, MsgSeqNos) -> - %% the stream queue included the queue type refactoring and thus requires - %% a different message format - case rabbit_ff_registry:is_enabled(steam_queue) of - true -> - rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos); - false -> - rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos) - end. + rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos). -send_rejection(Pid, QName, MsgSeqNo) -> - case rabbit_ff_registry:is_enabled(steam_queue) of - true -> - gen_server2:cast(Pid, {queue_event, QName, - {reject_publish, MsgSeqNo, self()}}); - false -> - gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()}) - end. diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 351b4961b9..5ab32b8632 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -45,6 +45,9 @@ delete_crashed/2, delete_crashed_internal/2]). +-export([confirm_to_sender/3, + send_rejection/3]). + is_enabled() -> true. declare(Q, Node) when ?amqqueue_is_classic(Q) -> @@ -490,3 +493,24 @@ update_msg_status(confirm, Pid, #msg_status{pending = P, S#msg_status{pending = Rem, confirmed = [Pid | C]}; update_msg_status(down, Pid, #msg_status{pending = P} = S) -> S#msg_status{pending = lists:delete(Pid, P)}. + +%% part of channel <-> queue api +confirm_to_sender(Pid, QName, MsgSeqNos) -> + %% the stream queue included the queue type refactoring and thus requires + %% a different message format + Evt = case rabbit_ff_registry:is_enabled(stream_queue) of + true -> + {queue_event, QName, {confirm, MsgSeqNos, self()}}; + false -> + {confirm, MsgSeqNos, self()} + end, + gen_server2:cast(Pid, Evt). + +send_rejection(Pid, QName, MsgSeqNo) -> + case rabbit_ff_registry:is_enabled(stream_queue) of + true -> + gen_server2:cast(Pid, {queue_event, QName, + {reject_publish, MsgSeqNo, self()}}); + false -> + gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()}) + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 994c2906b5..49e0a13a0e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -579,7 +579,8 @@ send_or_record_confirm(_Status, #delivery { sender = ChPid, confirm = true, msg_seq_no = MsgSeqNo }, MS, #state{q = Q} = _State) -> - ok = confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]), + ok = rabbit_classic_queue:confirm_to_sender(ChPid, + amqqueue:get_name(Q), [MsgSeqNo]), MS. confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) -> @@ -612,7 +613,7 @@ confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) -> end end, {gb_trees:empty(), MS}, MsgIds), Fun = fun (Pid, MsgSeqNos) -> - confirm_to_sender(Pid, QName, MsgSeqNos) + rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos) end, rabbit_misc:gb_trees_foreach(Fun, CMs), State #state { msg_id_status = MS1 }. @@ -1090,13 +1091,3 @@ record_synchronised(Q0) when ?is_amqqueue(Q0) -> ok -> ok; {ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2) end. - -confirm_to_sender(Pid, QName, MsgSeqNos) -> - %% the stream queue included the queue type refactoring and thus requires - %% a different message format - case rabbit_ff_registry:is_enabled(steam_queue) of - true -> - rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos); - false -> - rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos) - end. |
