diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 16:07:48 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-09-29 16:07:48 +0100 |
| commit | c335a1a3522846ca3d415d9772aa37f10b692775 (patch) | |
| tree | 6b617b5566bc6429d7a9f8ae2348f18a8095afef | |
| parent | 21c8e31bb846bc8ac7fa10938cd2d88d8af02412 (diff) | |
| download | rabbitmq-server-git-c335a1a3522846ca3d415d9772aa37f10b692775.tar.gz | |
more refactoring
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 32 |
3 files changed, 21 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1f8455cc25..a6472e7aa1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1003,8 +1003,6 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _, State = #ch{confirm_enabled = false}) -> - rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", - [Multiple, NoWait]), State1 = State#ch{confirm_enabled = true, confirm_multiple = Multiple}, case NoWait of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7d0af80a47..84853227d8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -848,12 +848,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), State1 #msstate { on_sync = [] } end, - dict:map(fun(CRef, Guids) -> - case dict:find(CRef, CODC) of - {ok, Fun} -> Fun(Guids); - error -> ok %% shouldn't happen - end - end, CTG), + dict:map(fun(CRef, Guids) -> Fun = dict:fetch(CRef, CODC), + Fun(Guids) end, + CTG), State2 #msstate { cref_to_guids = dict:new() }. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9b4255da01..88b6e83217 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -256,17 +256,17 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, IsPersistent, + State = #qistate { unsynced_guids = UnsyncedGuids }) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = get_journal_handle(State), + {JournalHdl, State1} = + get_journal_handle(State #qistate { unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - State2 = State1 #qistate { unsynced_guids = - [Guid | State1#qistate.unsynced_guids] }, - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)). + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -278,8 +278,7 @@ sync([], State) -> State; sync(_SeqIds, State = #qistate { journal_handle = undefined }) -> State; -sync(_SeqIds, State = #qistate { journal_handle = JournalHdl, - on_sync = OnSyncFun }) -> +sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% The SeqIds here contains the SeqId of every publish and ack in %% the transaction. Ideally we should go through these seqids and %% only sync the journal if the pubs or acks appear in the @@ -289,8 +288,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl, %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - OnSyncFun(State#qistate.unsynced_guids), - State#qistate { unsynced_guids = [] }. + notify_sync(State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -570,9 +568,7 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount, maybe_flush_journal(State) -> State. -flush_journal(State = #qistate { segments = Segments, - on_sync = OnSyncFun, - unsynced_guids = UGs }) -> +flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -587,8 +583,7 @@ flush_journal(State = #qistate { segments = Segments, {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - OnSyncFun(UGs), - State1 #qistate { dirty_count = 0, unsynced_guids = [] }. + notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> @@ -947,3 +942,12 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) -> {{no_pub, no_del, ack}, 0}; journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) -> {undefined, -1}. + +%%---------------------------------------------------------------------------- +%% misc +%%---------------------------------------------------------------------------- + +notify_sync(State = #qistate { unsynced_guids = UG, + on_sync = OnSyncFun }) -> + OnSyncFun(UG), + State #qistate { unsynced_guids = [] }. |
