summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-07 11:43:48 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-07 11:43:48 +0100
commitfe56296805817bc6f6e81dd60ca3cf541e7c2c3d (patch)
tree5261aa29d529bcdb7a7418e8cf7b27d24cfacd78
parent91d6a5d84e377d288d114999542f58bf82b5c1b7 (diff)
downloadrabbitmq-server-git-fe56296805817bc6f6e81dd60ca3cf541e7c2c3d.tar.gz
Move rabbit_misc:confirm_to_sender/3
out of rabbit_common as it is only used by modules in the server.
-rw-r--r--src/rabbit_amqqueue_process.erl20
-rw-r--r--src/rabbit_classic_queue.erl24
-rw-r--r--src/rabbit_mirror_queue_slave.erl15
3 files changed, 30 insertions, 29 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0aca86c4af..ae4ad6463d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -806,7 +806,8 @@ send_reject_publish(#delivery{confirm = true,
backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
- ok = send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo),
+ ok = rabbit_classic_queue:send_rejection(SenderPid,
+ amqqueue:get_name(Q), MsgSeqNo),
MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
@@ -1842,21 +1843,6 @@ update_ha_mode(State) ->
State.
confirm_to_sender(Pid, QName, MsgSeqNos) ->
- %% the stream queue included the queue type refactoring and thus requires
- %% a different message format
- case rabbit_ff_registry:is_enabled(steam_queue) of
- true ->
- rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos);
- false ->
- rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos)
- end.
+ rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos).
-send_rejection(Pid, QName, MsgSeqNo) ->
- case rabbit_ff_registry:is_enabled(steam_queue) of
- true ->
- gen_server2:cast(Pid, {queue_event, QName,
- {reject_publish, MsgSeqNo, self()}});
- false ->
- gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()})
- end.
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 351b4961b9..5ab32b8632 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -45,6 +45,9 @@
delete_crashed/2,
delete_crashed_internal/2]).
+-export([confirm_to_sender/3,
+ send_rejection/3]).
+
is_enabled() -> true.
declare(Q, Node) when ?amqqueue_is_classic(Q) ->
@@ -490,3 +493,24 @@ update_msg_status(confirm, Pid, #msg_status{pending = P,
S#msg_status{pending = Rem, confirmed = [Pid | C]};
update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
S#msg_status{pending = lists:delete(Pid, P)}.
+
+%% part of channel <-> queue api
+confirm_to_sender(Pid, QName, MsgSeqNos) ->
+ %% the stream queue included the queue type refactoring and thus requires
+ %% a different message format
+ Evt = case rabbit_ff_registry:is_enabled(stream_queue) of
+ true ->
+ {queue_event, QName, {confirm, MsgSeqNos, self()}};
+ false ->
+ {confirm, MsgSeqNos, self()}
+ end,
+ gen_server2:cast(Pid, Evt).
+
+send_rejection(Pid, QName, MsgSeqNo) ->
+ case rabbit_ff_registry:is_enabled(stream_queue) of
+ true ->
+ gen_server2:cast(Pid, {queue_event, QName,
+ {reject_publish, MsgSeqNo, self()}});
+ false ->
+ gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()})
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 994c2906b5..49e0a13a0e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -579,7 +579,8 @@ send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
MS, #state{q = Q} = _State) ->
- ok = confirm_to_sender(ChPid, amqqueue:get_name(Q), [MsgSeqNo]),
+ ok = rabbit_classic_queue:confirm_to_sender(ChPid,
+ amqqueue:get_name(Q), [MsgSeqNo]),
MS.
confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) ->
@@ -612,7 +613,7 @@ confirm_messages(MsgIds, State = #state{q = Q, msg_id_status = MS}) ->
end
end, {gb_trees:empty(), MS}, MsgIds),
Fun = fun (Pid, MsgSeqNos) ->
- confirm_to_sender(Pid, QName, MsgSeqNos)
+ rabbit_classic_queue:confirm_to_sender(Pid, QName, MsgSeqNos)
end,
rabbit_misc:gb_trees_foreach(Fun, CMs),
State #state { msg_id_status = MS1 }.
@@ -1090,13 +1091,3 @@ record_synchronised(Q0) when ?is_amqqueue(Q0) ->
ok -> ok;
{ok, Q2} -> rabbit_mirror_queue_misc:maybe_drop_master_after_sync(Q2)
end.
-
-confirm_to_sender(Pid, QName, MsgSeqNos) ->
- %% the stream queue included the queue type refactoring and thus requires
- %% a different message format
- case rabbit_ff_registry:is_enabled(steam_queue) of
- true ->
- rabbit_misc:confirm_to_sender(Pid, QName, MsgSeqNos);
- false ->
- rabbit_misc:confirm_to_sender_compat(Pid, QName, MsgSeqNos)
- end.