summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 16:07:48 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-09-29 16:07:48 +0100
commitc335a1a3522846ca3d415d9772aa37f10b692775 (patch)
tree6b617b5566bc6429d7a9f8ae2348f18a8095afef
parent21c8e31bb846bc8ac7fa10938cd2d88d8af02412 (diff)
downloadrabbitmq-server-git-c335a1a3522846ca3d415d9772aa37f10b692775.tar.gz
more refactoring
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_msg_store.erl9
-rw-r--r--src/rabbit_queue_index.erl32
3 files changed, 21 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1f8455cc25..a6472e7aa1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1003,8 +1003,6 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
_,
State = #ch{confirm_enabled = false}) ->
- rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
- [Multiple, NoWait]),
State1 = State#ch{confirm_enabled = true,
confirm_multiple = Multiple},
case NoWait of
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7d0af80a47..84853227d8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -848,12 +848,9 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
State1 #msstate { on_sync = [] }
end,
- dict:map(fun(CRef, Guids) ->
- case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids);
- error -> ok %% shouldn't happen
- end
- end, CTG),
+ dict:map(fun(CRef, Guids) -> Fun = dict:fetch(CRef, CODC),
+ Fun(Guids) end,
+ CTG),
State2 #msstate { cref_to_guids = dict:new() }.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9b4255da01..88b6e83217 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -256,17 +256,17 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, IsPersistent,
+ State = #qistate { unsynced_guids = UnsyncedGuids }) when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} = get_journal_handle(State),
+ {JournalHdl, State1} =
+ get_journal_handle(State #qistate { unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
- State2 = State1 #qistate { unsynced_guids =
- [Guid | State1#qistate.unsynced_guids] },
- maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State2)).
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
deliver(SeqIds, State) ->
deliver_or_ack(del, SeqIds, State).
@@ -278,8 +278,7 @@ sync([], State) ->
State;
sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl,
- on_sync = OnSyncFun }) ->
+sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -289,8 +288,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl,
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- OnSyncFun(State#qistate.unsynced_guids),
- State#qistate { unsynced_guids = [] }.
+ notify_sync(State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -570,9 +568,7 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount,
maybe_flush_journal(State) ->
State.
-flush_journal(State = #qistate { segments = Segments,
- on_sync = OnSyncFun,
- unsynced_guids = UGs }) ->
+flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
@@ -587,8 +583,7 @@ flush_journal(State = #qistate { segments = Segments,
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- OnSyncFun(UGs),
- State1 #qistate { dirty_count = 0, unsynced_guids = [] }.
+ notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
@@ -947,3 +942,12 @@ journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
{{no_pub, no_del, ack}, 0};
journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
{undefined, -1}.
+
+%%----------------------------------------------------------------------------
+%% misc
+%%----------------------------------------------------------------------------
+
+notify_sync(State = #qistate { unsynced_guids = UG,
+ on_sync = OnSyncFun }) ->
+ OnSyncFun(UG),
+ State #qistate { unsynced_guids = [] }.