diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-26 18:30:03 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-26 18:30:03 +0100 |
| commit | 10aa354784bb711c8c0329a5623fc9a6a90c40d2 (patch) | |
| tree | 1e6949263afe584afacdd5ac30cabc7279514b0f | |
| parent | 00485709f0b41a09014deb355490036a9a52c062 (diff) | |
| parent | 19b343acfe0e7698eab93539f8d2bd612a4b9570 (diff) | |
| download | rabbitmq-server-git-10aa354784bb711c8c0329a5623fc9a6a90c40d2.tar.gz | |
merging in from 20470
| -rw-r--r-- | src/rabbit_disk_queue.erl | 38 |
1 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index db1b314a74..4333f667cd 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -88,6 +88,7 @@ read_file_handles_limit, %% how many file handles can we open? on_sync_froms, %% list of commiters to run on sync (reversed) timer_ref, %% TRef for our interval timer + last_sync_offset, %% current_offset at the last time we sync'd message_cache %% ets message cache }). @@ -405,6 +406,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> read_file_handles_limit = ReadFileHandlesLimit, on_sync_froms = [], timer_ref = undefined, + last_sync_offset = 0, message_cache = ets:new(?CACHE_ETS_NAME, [set, private]) }, @@ -665,13 +667,14 @@ determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) when NextWrite >= CurrentWrite -> CurrentRead. -get_read_handle(File, State = +get_read_handle(File, Offset, State = #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, read_file_handles_limit = ReadFileHandlesLimit, current_file_name = CurName, - current_dirty = IsDirty + current_dirty = IsDirty, + last_sync_offset = SyncOffset }) -> - State1 = if CurName =:= File andalso IsDirty -> + State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset -> sync_current_file_handle(State); true -> State end, @@ -744,15 +747,19 @@ sync_current_file_handle(State = #dqstate { current_dirty = false, State; sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, current_dirty = IsDirty, - on_sync_froms = Froms + current_offset = CurOffset, + on_sync_froms = Froms, + last_sync_offset = SyncOffset }) -> - ok = case IsDirty of - true -> file:sync(CurHdl); - false -> ok - end, + SyncOffset1 = case IsDirty of + true -> ok = file:sync(CurHdl), + CurOffset; + false -> SyncOffset + end, lists:map(fun (From) -> gen_server2:reply(From, ok) end, lists:reverse(Froms)), - State #dqstate { current_dirty = false, on_sync_froms = [] }. + State #dqstate { current_dirty = false, on_sync_froms = [], + last_sync_offset = SyncOffset1 }. msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), @@ -833,7 +840,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) -> true -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {FileHdl, State1} = get_read_handle(File, State), + {FileHdl, State1} = get_read_handle(File, Offset, State), {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), Message = bin_to_msg(MsgBody), @@ -957,7 +964,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, State = #dqstate { sequences = Sequences, current_file_name = CurFile, current_dirty = IsDirty, - on_sync_froms = SyncFroms + on_sync_froms = SyncFroms, + last_sync_offset = SyncOffset }) -> {PubList, PubAcc, ReadSeqId, Length} = case PubMsgSeqIds of @@ -983,7 +991,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, lists:foldl( fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, {InCurFileAcc, ExpectedSeqId}) -> - [{MsgId, _RefCount, File, _Offset, + [{MsgId, _RefCount, File, Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId1 = adjust_last_msg_seq_id( Q, ExpectedSeqId, SeqId, write), @@ -998,7 +1006,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From, next_seq_id = NextSeqId1 }, write), - {InCurFileAcc orelse File =:= CurFile, + {InCurFileAcc orelse (File =:= CurFile andalso + Offset >= SyncOffset), NextSeqId1} end, {false, PubAcc}, PubList), {ok, State2} = remove_messages(Q, AckSeqIds, txn, State), @@ -1202,7 +1211,8 @@ maybe_roll_to_new_file(Offset, State2 = State1 #dqstate { current_file_name = NextName, current_file_handle = NextHdl, current_file_num = NextNum, - current_offset = 0 + current_offset = 0, + last_sync_offset = 0 }, {ok, compact(sets:from_list([CurName]), State2)}; maybe_roll_to_new_file(_, State) -> |
