diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-27 11:43:06 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-27 11:43:06 +0100 |
| commit | 884f90a00a1d2e15ca52e640a9156be713c67a5c (patch) | |
| tree | 12008bfab1e663bdc79368f7f2fab14187b4bc29 | |
| parent | 22ebfeefb80c1bd9584e420aa803ec0a72c45e1a (diff) | |
| download | rabbitmq-server-git-884f90a00a1d2e15ca52e640a9156be713c67a5c.tar.gz | |
preemptive refactoring
| -rw-r--r-- | src/rabbit_disk_queue.erl | 25 |
1 files changed, 14 insertions, 11 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 7710a0a226..c7ef117717 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -705,10 +705,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> {ReadSeqId2, WriteSeqId2, Length2} end, - InitReadSeqId2 = if InitReadSeqId == InitWriteSeqId andalso FirstSeqIdTo > InitWriteSeqId andalso FirstSeqIdTo /= next -> - FirstSeqIdTo; - true -> InitReadSeqId - end, + InitReadSeqId2 = determine_next_read_id(InitReadSeqId, InitWriteSeqId, FirstSeqIdTo), { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), InitWriteSeqId, InitReadSeqId2, InitLength} end, @@ -764,11 +761,9 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) -> [{Q, ReadSeqId2, WriteSeqId2, Length2}] -> {ReadSeqId2, WriteSeqId2, Length2} end, + ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId), WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty), WriteSeqId3Next = WriteSeqId3 + 1, - ReadSeqId3 = if ReadSeqId == WriteSeqId andalso WriteSeqId3 > WriteSeqId -> WriteSeqId3; - true -> ReadSeqId - end, ok = mnesia:dirty_write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, @@ -783,6 +778,17 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) -> + CurrentReadWrite; +determine_next_read_id(CurrentRead, _CurrentWrite, next) -> + CurrentRead; +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite) + when NextWrite > CurrentReadWrite -> + NextWrite; +determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) + when NextWrite >= CurrentWrite -> + CurrentRead. + internal_requeue(_Q, [], State) -> {ok, State}; internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], @@ -811,10 +817,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], %% the Q _must_ already exist [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q), - ReadSeqId2 = - if ReadSeqId == WriteSeqId andalso FirstSeqIdTo > WriteSeqId andalso FirstSeqIdTo /= next -> FirstSeqIdTo; - true -> ReadSeqId - end, + ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo), MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = mnesia:transaction( |
