summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl24
1 files changed, 15 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 4559bb8a1e..7dc694586c 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -162,7 +162,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, unsynced_msg_ids }).
+ max_journal_entries, on_sync, unconfirmed }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -190,7 +190,7 @@
dirty_count :: integer(),
max_journal_entries :: non_neg_integer(),
on_sync :: on_sync_fun(),
- unsynced_msg_ids :: gb_set()
+ unconfirmed :: gb_set()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -269,13 +269,16 @@ delete_and_terminate(State) ->
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
- State = #qistate { unsynced_msg_ids = UnsyncedMsgIds })
+ State = #qistate { unconfirmed = Unconfirmed })
when is_binary(MsgId) ->
?MSG_ID_BYTES = size(MsgId),
{JournalHdl, State1} =
get_journal_handle(
- State #qistate {
- unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }),
+ case MsgProps#message_properties.needs_confirming of
+ true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed),
+ State #qistate { unconfirmed = Unconfirmed1 };
+ false -> State
+ end),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -398,7 +401,7 @@ blank_state_dir(Dir) ->
dirty_count = 0,
max_journal_entries = MaxJournal,
on_sync = fun (_) -> ok end,
- unsynced_msg_ids = gb_sets:new() }.
+ unconfirmed = gb_sets:new() }.
clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -732,9 +735,12 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
-notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
- State #qistate { unsynced_msg_ids = gb_sets:new() }.
+notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) ->
+ case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State #qistate { unconfirmed = gb_sets:new() }
+ end.
%%----------------------------------------------------------------------------
%% segment manipulation