summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-26 18:30:03 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-26 18:30:03 +0100
commit10aa354784bb711c8c0329a5623fc9a6a90c40d2 (patch)
tree1e6949263afe584afacdd5ac30cabc7279514b0f
parent00485709f0b41a09014deb355490036a9a52c062 (diff)
parent19b343acfe0e7698eab93539f8d2bd612a4b9570 (diff)
downloadrabbitmq-server-git-10aa354784bb711c8c0329a5623fc9a6a90c40d2.tar.gz
merging in from 20470
-rw-r--r--src/rabbit_disk_queue.erl38
1 files changed, 24 insertions, 14 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index db1b314a74..4333f667cd 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -88,6 +88,7 @@
read_file_handles_limit, %% how many file handles can we open?
on_sync_froms, %% list of commiters to run on sync (reversed)
timer_ref, %% TRef for our interval timer
+ last_sync_offset, %% current_offset at the last time we sync'd
message_cache %% ets message cache
}).
@@ -405,6 +406,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles_limit = ReadFileHandlesLimit,
on_sync_froms = [],
timer_ref = undefined,
+ last_sync_offset = 0,
message_cache = ets:new(?CACHE_ETS_NAME,
[set, private])
},
@@ -665,13 +667,14 @@ determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
when NextWrite >= CurrentWrite ->
CurrentRead.
-get_read_handle(File, State =
+get_read_handle(File, Offset, State =
#dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
read_file_handles_limit = ReadFileHandlesLimit,
current_file_name = CurName,
- current_dirty = IsDirty
+ current_dirty = IsDirty,
+ last_sync_offset = SyncOffset
}) ->
- State1 = if CurName =:= File andalso IsDirty ->
+ State1 = if CurName =:= File andalso IsDirty andalso Offset >= SyncOffset ->
sync_current_file_handle(State);
true -> State
end,
@@ -744,15 +747,19 @@ sync_current_file_handle(State = #dqstate { current_dirty = false,
State;
sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
current_dirty = IsDirty,
- on_sync_froms = Froms
+ current_offset = CurOffset,
+ on_sync_froms = Froms,
+ last_sync_offset = SyncOffset
}) ->
- ok = case IsDirty of
- true -> file:sync(CurHdl);
- false -> ok
- end,
+ SyncOffset1 = case IsDirty of
+ true -> ok = file:sync(CurHdl),
+ CurOffset;
+ false -> SyncOffset
+ end,
lists:map(fun (From) -> gen_server2:reply(From, ok) end,
lists:reverse(Froms)),
- State #dqstate { current_dirty = false, on_sync_froms = [] }.
+ State #dqstate { current_dirty = false, on_sync_froms = [],
+ last_sync_offset = SyncOffset1 }.
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
@@ -833,7 +840,7 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
true ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {FileHdl, State1} = get_read_handle(File, State),
+ {FileHdl, State1} = get_read_handle(File, Offset, State),
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
Message = bin_to_msg(MsgBody),
@@ -957,7 +964,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
State = #dqstate { sequences = Sequences,
current_file_name = CurFile,
current_dirty = IsDirty,
- on_sync_froms = SyncFroms
+ on_sync_froms = SyncFroms,
+ last_sync_offset = SyncOffset
}) ->
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
@@ -983,7 +991,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
lists:foldl(
fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
{InCurFileAcc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset,
+ [{MsgId, _RefCount, File, Offset,
_TotalSize}] = dets_ets_lookup(State, MsgId),
SeqId1 = adjust_last_msg_seq_id(
Q, ExpectedSeqId, SeqId, write),
@@ -998,7 +1006,8 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, From,
next_seq_id = NextSeqId1
},
write),
- {InCurFileAcc orelse File =:= CurFile,
+ {InCurFileAcc orelse (File =:= CurFile andalso
+ Offset >= SyncOffset),
NextSeqId1}
end, {false, PubAcc}, PubList),
{ok, State2} = remove_messages(Q, AckSeqIds, txn, State),
@@ -1202,7 +1211,8 @@ maybe_roll_to_new_file(Offset,
State2 = State1 #dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
current_file_num = NextNum,
- current_offset = 0
+ current_offset = 0,
+ last_sync_offset = 0
},
{ok, compact(sets:from_list([CurName]), State2)};
maybe_roll_to_new_file(_, State) ->