summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/unconfirmed_messages.erl88
1 files changed, 38 insertions, 50 deletions
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl
index 701ca115c2..63a504a239 100644
--- a/src/unconfirmed_messages.erl
+++ b/src/unconfirmed_messages.erl
@@ -15,24 +15,22 @@
%%
%% Unconfirmed messages tracking.
-
-%% A message should be confirmed only when all queues confirm
-
+%%
+%% A message should be confirmed to the publisher only when all queues confirm.
+%%
%% Messages are published to multiple queues while each queue may be
%% represented by several processes (queue refs).
-
-%% Queue refs return confirmations, rejections, may fail or disconnect.
-%% If a queue ref fails - the messgae should be rejected.
-%% If all queue refs for a queue disconnect (not fail) without confirmation -
-%% the messge should be rejected.
-
-%% For simplicity, disconnects do not return reject until all message refs
+%%
+%% Queue refs return confirmations, rejections, can fail or disconnect.
+%% If a queue ref fails, messgae should be rejected.
+%% If all queue refs for a queue disconnect (not fail) without confirmation,
+%% messge should be rejected.
+%%
+%% For simplicity, disconnects do not return a reject until all message refs
%% confirm or disconnect.
-
-module(unconfirmed_messages).
-
-export([new/0,
insert/5,
confirm_msg_ref/4,
@@ -57,22 +55,22 @@
-type exchange_name() :: rabbit_exchange:name().
-type map_set(Type) :: #{Type => ?SET_VALUE}.
-
-%% refs is a set of refs waiting for confirm
-%% queue_status shows which queues had at least one confirmation
-
--record(msg_status,
- {refs = #{} :: map_set(queue_ref()),
- queue_status = #{} :: #{queue_name() => confirmed | rejected},
- exchange :: exchange_name()}).
-
-%% ordered set is needed to get unconfirmed cutoff
-%% index contains message statuses of all message IDs
-%% reverse index is needed to locate message IDs from queue refs
--record(unconfirmed,
- {ordered = gb_sets:new() :: gb_sets:set(msg_id()),
- index = #{} :: #{msg_id() => #msg_status{}},
- reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}}}).
+-record(msg_status, {
+ %% a set of refs waiting for confirm
+ refs = #{} :: map_set(queue_ref()),
+ %% shows which queues had at least one confirmation
+ queue_status = #{} :: #{queue_name() => confirmed | rejected},
+ exchange :: exchange_name()
+}).
+
+-record(unconfirmed, {
+ %% needed to get unconfirmed cutoff
+ ordered = gb_sets:new() :: gb_sets:set(msg_id()),
+ %% contains message statuses of all message IDs
+ index = #{} :: #{msg_id() => #msg_status{}},
+ %% needed to look up message IDs for a queue ref
+ reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}}
+}).
-opaque ?MODULE() :: #unconfirmed{}.
@@ -81,15 +79,13 @@
-spec new() -> ?MODULE().
new() -> #unconfirmed{}.
-%% Insert and entry. Fails if there already is an entry with the given
-%% message id.
-
+%% Insert an entry for the message ID. Fails if there already is
+%% an entry with the given ID.
-spec insert(msg_id(), [queue_name()], [queue_ref()], exchange_name(), ?MODULE()) -> ?MODULE().
insert(MsgId, QueueNames, QueueRefs, XName,
#unconfirmed{ordered = Ordered,
index = Index,
reverse = Reverse} = UC) ->
-
case maps:get(MsgId, Index, none) of
none ->
UC#unconfirmed{
@@ -116,12 +112,8 @@ insert(MsgId, QueueNames, QueueRefs, XName,
error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC})
end.
-%% Standard confirmation.
-%% Removes the queue ref from waiting, if it was the last one -
-%% return confirmed and cleanup the message id state
-%% If the ref was not the last one - update queues status for the queue
-%% to confirmed
-
+%% Confirms a message on behalf of the given queue. If it was the last queue (ref)
+%% on the waiting list, returns 'confirmed' and performs the necessary cleanup.
-spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) ->
{{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
confirm_msg_ref(MsgId, QueueName, QueueRef,
@@ -143,12 +135,12 @@ confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef,
{[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}},
MsgIds).
-%% Remove queue ref from waiting for all messages based on reverse index.
-%% If there are no more refs left for the message - return either
-%% confirmed or rejected.
-%% Confirmed is returned if all queues have queue status confirmed,
-%% which means that each queue has at least one ref (process) confirmed.
+%% Removes all messages for a queue.
%% Returns lists of confirmed and rejected messages.
+%%
+%% If there are no more refs left for the message, either
+%% 'confirmed' or 'rejected'.
+%% 'confirmed' is returned if all queues have confirmed the message.
-spec forget_ref(queue_ref(), ?MODULE()) ->
{Confirmed :: [{msg_id(), exchange_name()}],
Rejected :: [{msg_id(), exchange_name()}],
@@ -165,8 +157,8 @@ forget_ref(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
{[], [], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}},
MsgIds).
-%% Cleanup message id
-%% Return rejected if there was a message with
+%% Rejects a single message with the given ID.
+%% Returns 'rejected' if there was a message with
%% such ID.
-spec reject_msg(msg_id(), ?MODULE()) ->
{{rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}.
@@ -182,9 +174,7 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever
reverse = remove_multiple_from_reverse(maps:keys(Refs), [MsgId], Reverse)}}
end.
-%% Cleanup message ids for all messages referencing ref
-%% Based on reverse index.
-%% Returns a list of rejected messages
+%% Rejects all pending messages for a queue.
-spec reject_all_for_queue(queue_ref(), ?MODULE()) ->
{Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}.
reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) ->
@@ -288,5 +278,3 @@ confirm_status(QueueStatus) ->
true -> confirmed;
false -> rejected
end.
-
-