diff options
| -rw-r--r-- | src/unconfirmed_messages.erl | 88 |
1 files changed, 38 insertions, 50 deletions
diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl index 701ca115c2..63a504a239 100644 --- a/src/unconfirmed_messages.erl +++ b/src/unconfirmed_messages.erl @@ -15,24 +15,22 @@ %% %% Unconfirmed messages tracking. - -%% A message should be confirmed only when all queues confirm - +%% +%% A message should be confirmed to the publisher only when all queues confirm. +%% %% Messages are published to multiple queues while each queue may be %% represented by several processes (queue refs). - -%% Queue refs return confirmations, rejections, may fail or disconnect. -%% If a queue ref fails - the messgae should be rejected. -%% If all queue refs for a queue disconnect (not fail) without confirmation - -%% the messge should be rejected. - -%% For simplicity, disconnects do not return reject until all message refs +%% +%% Queue refs return confirmations, rejections, can fail or disconnect. +%% If a queue ref fails, messgae should be rejected. +%% If all queue refs for a queue disconnect (not fail) without confirmation, +%% messge should be rejected. +%% +%% For simplicity, disconnects do not return a reject until all message refs %% confirm or disconnect. - -module(unconfirmed_messages). - -export([new/0, insert/5, confirm_msg_ref/4, @@ -57,22 +55,22 @@ -type exchange_name() :: rabbit_exchange:name(). -type map_set(Type) :: #{Type => ?SET_VALUE}. - -%% refs is a set of refs waiting for confirm -%% queue_status shows which queues had at least one confirmation - --record(msg_status, - {refs = #{} :: map_set(queue_ref()), - queue_status = #{} :: #{queue_name() => confirmed | rejected}, - exchange :: exchange_name()}). - -%% ordered set is needed to get unconfirmed cutoff -%% index contains message statuses of all message IDs -%% reverse index is needed to locate message IDs from queue refs --record(unconfirmed, - {ordered = gb_sets:new() :: gb_sets:set(msg_id()), - index = #{} :: #{msg_id() => #msg_status{}}, - reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}}}). +-record(msg_status, { + %% a set of refs waiting for confirm + refs = #{} :: map_set(queue_ref()), + %% shows which queues had at least one confirmation + queue_status = #{} :: #{queue_name() => confirmed | rejected}, + exchange :: exchange_name() +}). + +-record(unconfirmed, { + %% needed to get unconfirmed cutoff + ordered = gb_sets:new() :: gb_sets:set(msg_id()), + %% contains message statuses of all message IDs + index = #{} :: #{msg_id() => #msg_status{}}, + %% needed to look up message IDs for a queue ref + reverse = #{} :: #{queue_ref() => #{msg_id() => ?SET_VALUE}} +}). -opaque ?MODULE() :: #unconfirmed{}. @@ -81,15 +79,13 @@ -spec new() -> ?MODULE(). new() -> #unconfirmed{}. -%% Insert and entry. Fails if there already is an entry with the given -%% message id. - +%% Insert an entry for the message ID. Fails if there already is +%% an entry with the given ID. -spec insert(msg_id(), [queue_name()], [queue_ref()], exchange_name(), ?MODULE()) -> ?MODULE(). insert(MsgId, QueueNames, QueueRefs, XName, #unconfirmed{ordered = Ordered, index = Index, reverse = Reverse} = UC) -> - case maps:get(MsgId, Index, none) of none -> UC#unconfirmed{ @@ -116,12 +112,8 @@ insert(MsgId, QueueNames, QueueRefs, XName, error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC}) end. -%% Standard confirmation. -%% Removes the queue ref from waiting, if it was the last one - -%% return confirmed and cleanup the message id state -%% If the ref was not the last one - update queues status for the queue -%% to confirmed - +%% Confirms a message on behalf of the given queue. If it was the last queue (ref) +%% on the waiting list, returns 'confirmed' and performs the necessary cleanup. -spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. confirm_msg_ref(MsgId, QueueName, QueueRef, @@ -143,12 +135,12 @@ confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef, {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, MsgIds). -%% Remove queue ref from waiting for all messages based on reverse index. -%% If there are no more refs left for the message - return either -%% confirmed or rejected. -%% Confirmed is returned if all queues have queue status confirmed, -%% which means that each queue has at least one ref (process) confirmed. +%% Removes all messages for a queue. %% Returns lists of confirmed and rejected messages. +%% +%% If there are no more refs left for the message, either +%% 'confirmed' or 'rejected'. +%% 'confirmed' is returned if all queues have confirmed the message. -spec forget_ref(queue_ref(), ?MODULE()) -> {Confirmed :: [{msg_id(), exchange_name()}], Rejected :: [{msg_id(), exchange_name()}], @@ -165,8 +157,8 @@ forget_ref(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> {[], [], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, MsgIds). -%% Cleanup message id -%% Return rejected if there was a message with +%% Rejects a single message with the given ID. +%% Returns 'rejected' if there was a message with %% such ID. -spec reject_msg(msg_id(), ?MODULE()) -> {{rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. @@ -182,9 +174,7 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever reverse = remove_multiple_from_reverse(maps:keys(Refs), [MsgId], Reverse)}} end. -%% Cleanup message ids for all messages referencing ref -%% Based on reverse index. -%% Returns a list of rejected messages +%% Rejects all pending messages for a queue. -spec reject_all_for_queue(queue_ref(), ?MODULE()) -> {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}. reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> @@ -288,5 +278,3 @@ confirm_status(QueueStatus) -> true -> confirmed; false -> rejected end. - - |
