diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-30 13:41:28 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-30 13:41:28 +0000 |
| commit | 3f15c30196f250ebb2d76e604a0bce7d72b2a056 (patch) | |
| tree | 9e830dd4e8479e8509d539f1e0975e245440749b | |
| parent | 6c0546e98e8637c9eb0a3f4a74bdb902b58235d4 (diff) | |
| download | rabbitmq-server-git-3f15c30196f250ebb2d76e604a0bce7d72b2a056.tar.gz | |
only keep track of *unconfirmed* unsync'ed messages ids in qi
...so we don't bloat memory with stuff nobody cares about, and reduce
the qi->queue callbacks (to zero when no confirms/tx are used)
This does expose qi to a bit more messaging semantics, but imo to an
acceptable degree.
related changes:
- rename the state var to better capture its (revised) meaning
- only invoke the OnSyncFun when we have something to say
| -rw-r--r-- | src/rabbit_queue_index.erl | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4559bb8a1e..7dc694586c 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -162,7 +162,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries, on_sync, unsynced_msg_ids }). + max_journal_entries, on_sync, unconfirmed }). -record(segment, { num, path, journal_entries, unacked }). @@ -190,7 +190,7 @@ dirty_count :: integer(), max_journal_entries :: non_neg_integer(), on_sync :: on_sync_fun(), - unsynced_msg_ids :: gb_set() + unconfirmed :: gb_set() }). -type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())). -type(walker(A) :: fun ((A) -> 'finished' | @@ -269,13 +269,16 @@ delete_and_terminate(State) -> State1. publish(MsgId, SeqId, MsgProps, IsPersistent, - State = #qistate { unsynced_msg_ids = UnsyncedMsgIds }) + State = #qistate { unconfirmed = Unconfirmed }) when is_binary(MsgId) -> ?MSG_ID_BYTES = size(MsgId), {JournalHdl, State1} = get_journal_handle( - State #qistate { - unsynced_msg_ids = gb_sets:add_element(MsgId, UnsyncedMsgIds) }), + case MsgProps#message_properties.needs_confirming of + true -> Unconfirmed1 = gb_sets:add_element(MsgId, Unconfirmed), + State #qistate { unconfirmed = Unconfirmed1 }; + false -> State + end), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -398,7 +401,7 @@ blank_state_dir(Dir) -> dirty_count = 0, max_journal_entries = MaxJournal, on_sync = fun (_) -> ok end, - unsynced_msg_ids = gb_sets:new() }. + unconfirmed = gb_sets:new() }. clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -732,9 +735,12 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). -notify_sync(State = #qistate { unsynced_msg_ids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(UG), - State #qistate { unsynced_msg_ids = gb_sets:new() }. +notify_sync(State = #qistate { unconfirmed = UC, on_sync = OnSyncFun }) -> + case gb_sets:is_empty(UC) of + true -> State; + false -> OnSyncFun(UC), + State #qistate { unconfirmed = gb_sets:new() } + end. %%---------------------------------------------------------------------------- %% segment manipulation |
