diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4559bb8a1e..7dc694586c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -162,7 +162,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_msg_ids }). + max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -190,7 +190,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_msg_ids :: gb_set() + unconfirmed :: gb_set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -269,13 +269,16 @@ delete_and_terminate(State) -> State1. publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + State = #qistate { unconfirmed = Unconfirmed }) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - State #qistate { - unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }), + case MsgProps#message_properties.needs_confirming of + true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), + State #qistate { unconfirmed = Unconfirmed1 }; + false -> State + end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -398,7 +401,7 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_msg_ids = gb_sets:new() }. + unconfirmed = gb_sets:new() }. clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -732,9 +735,12 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(UG), - State #qistate { unsynced_msg_ids = gb_sets:new() }. +notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> + case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State #qistate { unconfirmed = gb_sets:new() } + end. %%---------------------------------------------------------------------------- %% segment manipulation |
