summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-08 16:26:36 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-08 16:26:36 +0100
commit30841299c5151220bf4cebc97fe4ee2c7d6245bc (patch)
treed2664393b5b060fb9b0aff84a663a56fc71e15fd
parent0f849b8497806fe72bd2b2441c2d8cdb352a91f6 (diff)
downloadrabbitmq-server-git-30841299c5151220bf4cebc97fe4ee2c7d6245bc.tar.gz
refactorings and code cleanup
-rw-r--r--src/rabbit_disk_queue.erl753
1 files changed, 398 insertions, 355 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b7eca499d6..a8773af61c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -344,9 +344,11 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
Node = node(),
ok =
- case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
+ disc_only_copies) of
{atomic, ok} -> ok;
- {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node,
+ disc_only_copies}} -> ok;
E -> E
end,
ok = filelib:ensure_dir(form_filename("nothing")),
@@ -393,12 +395,10 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
end,
%% read is only needed so that we can seek
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
- ok = if Exists -> ok;
- true -> %% new file, so preallocate
- {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}),
- file:truncate(FileHdl)
- end,
- {ok, Offset} = file:position(FileHdl, {bof, Offset}),
+ if Exists -> {ok, Offset} = file:position(FileHdl, {bof, Offset});
+ true -> %% new file, so preallocate
+ ok = preallocate(FileHdl, FileSizeLimit, Offset)
+ end,
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
handle_call({deliver, Q}, _From, State) ->
@@ -408,7 +408,7 @@ handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, State),
{reply, Result, State1};
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
- PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(erlang:length(PubMsgIds), next)),
+ PubMsgSeqIds = zip_with_tail(PubMsgIds, {duplicate, next}),
{ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State),
{reply, ok, State1};
handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) ->
@@ -431,29 +431,33 @@ handle_call(stop_vaporise, _From, State) ->
State1 #dqstate { current_file_handle = undefined,
read_file_handles = {dict:new(), gb_trees:empty()}}};
%% gen_server now calls terminate, which then calls shutdown
-handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) ->
+handle_call(to_disk_only_mode, _From,
+ State = #dqstate { operation_mode = disk_only }) ->
{reply, ok, State};
-handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
- {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies),
+handle_call(to_disk_only_mode, _From,
+ State = #dqstate { operation_mode = ram_disk,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
{reply, ok, State #dqstate { operation_mode = disk_only }};
-handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) ->
+handle_call(to_ram_disk_mode, _From,
+ State = #dqstate { operation_mode = ram_disk }) ->
{reply, ok, State};
-handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
- {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies),
+handle_call(to_ram_disk_mode, _From,
+ State = #dqstate { operation_mode = disk_only,
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
+ {atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(),
+ disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
{reply, ok, State #dqstate { operation_mode = ram_disk }};
handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
- case ets:lookup(Sequences, Q) of
- [] -> {reply, 0, State};
- [{Q, _ReadSeqId, _WriteSeqId, Length}] -> {reply, Length, State}
- end;
+ {_ReadSeqId, _WriteSeqId, Length} = sequence_lookup(Sequences, Q),
+ {reply, Length, State};
handle_call({dump_queue, Q}, _From, State) ->
{Result, State1} = internal_dump_queue(Q, State),
{reply, Result, State1};
@@ -477,7 +481,7 @@ handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
{noreply, State1};
handle_cast({requeue, Q, MsgSeqIds}, State) ->
- MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(erlang:length(MsgSeqIds), next)),
+ MsgSeqSeqIds = zip_with_tail(MsgSeqIds, {duplicate, next}),
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
{noreply, State1};
handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
@@ -524,40 +528,57 @@ form_filename(Name) ->
base_directory() ->
filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
-dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+zip_with_tail(List1, List2) when length(List1) =:= length(List2) ->
+ lists:zip(List1, List2);
+zip_with_tail(List = [_|Tail], {last, E}) ->
+ zip_with_tail(List, Tail ++ [E]);
+zip_with_tail(List, {duplicate, E}) ->
+ zip_with_tail(List, lists:duplicate(erlang:length(List), E)).
+
+dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
Key) ->
dets:lookup(MsgLocationDets, Key);
-dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
Key) ->
ets:lookup(MsgLocationEts, Key).
-dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
Key) ->
ok = dets:delete(MsgLocationDets, Key);
-dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
Key) ->
true = ets:delete(MsgLocationEts, Key),
ok.
-dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
Obj) ->
ok = dets:insert(MsgLocationDets, Obj);
-dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
Obj) ->
true = ets:insert(MsgLocationEts, Obj),
ok.
-dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
Obj) ->
true = dets:insert_new(MsgLocationDets, Obj);
-dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
Obj) ->
true = ets:insert_new(MsgLocationEts, Obj).
-dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
+dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets,
+ operation_mode = disk_only },
Obj) ->
dets:match_object(MsgLocationDets, Obj);
-dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
+dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
+ operation_mode = ram_disk },
Obj) ->
ets:match_object(MsgLocationEts, Obj).
@@ -612,18 +633,28 @@ adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) ->
SuppliedSeqId;
adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) ->
ExpectedSeqId;
-adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId ->
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty)
+ when SuppliedSeqId > ExpectedSeqId ->
[Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
ok = mnesia:dirty_write(rabbit_disk_queue,
Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
SuppliedSeqId;
-adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId ->
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock)
+ when SuppliedSeqId > ExpectedSeqId ->
[Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock),
ok = mnesia:write(rabbit_disk_queue,
Obj #dq_msg_loc { next_seq_id = SuppliedSeqId },
Lock),
SuppliedSeqId.
+sequence_lookup(Sequences, Q) ->
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ {0, 0, 0};
+ [{Q, ReadSeqId, WriteSeqId, Length}] ->
+ {ReadSeqId, WriteSeqId, Length}
+ end.
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
@@ -632,14 +663,18 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
[{Q, SeqId, SeqId, 0}] -> {ok, empty, State};
[{Q, ReadSeqId, WriteSeqId, Length}] when Length > 0 ->
Remaining = Length - 1,
- {ok, Result, NextReadSeqId, State1} = internal_read_message(Q, ReadSeqId, false, ReadMsg, State),
- true = ets:insert(Sequences, {Q, NextReadSeqId, WriteSeqId, Remaining}),
- {ok, case Result of
- {MsgId, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
- {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} ->
- {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}, Remaining}
- end, State1}
+ {ok, Result, NextReadSeqId, State1} =
+ internal_read_message(Q, ReadSeqId, false, ReadMsg, State),
+ true = ets:insert(Sequences,
+ {Q, NextReadSeqId, WriteSeqId, Remaining}),
+ {ok,
+ case Result of
+ {MsgId, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, Delivered, {MsgId, ReadSeqId}, Remaining};
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}} ->
+ {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId},
+ Remaining}
+ end, State1}
end.
internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
@@ -661,7 +696,8 @@ internal_read_message(Q, ReadSeqId, FakeDeliver, ReadMsg, State) ->
read_message_at_offset(FileHdl, Offset, TotalSize),
{ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
NextReadSeqId, State1};
- true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
+ true ->
+ {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, NextReadSeqId, State}
end.
internal_ack(Q, MsgSeqIds, State) ->
@@ -685,25 +721,30 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
Files3 =
if 1 =:= RefCount ->
ok = dets_ets_delete(State, MsgId),
- [{File, ValidTotalSize, ContiguousTop, Left, Right}] =
- ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary,
- {File, (ValidTotalSize - TotalSize
- - ?FILE_PACKING_ADJUSTMENT),
+ [{File, ValidTotalSize, ContiguousTop,
+ Left, Right}] = ets:lookup(FileSummary, File),
+ ContiguousTop1 =
+ lists:min([ContiguousTop, Offset]),
+ true =
+ ets:insert(FileSummary,
+ {File, (ValidTotalSize-TotalSize-
+ ?FILE_PACKING_ADJUSTMENT),
ContiguousTop1, Left, Right}),
if CurName =:= File -> Files2;
true -> sets:add_element(File, Files2)
end;
1 < RefCount ->
- ok = dets_ets_insert(State, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
+ ok = dets_ets_insert(
+ State, {MsgId, RefCount - 1,
+ File, Offset, TotalSize}),
Files2
end,
ok = if MnesiaDelete ->
- mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
+ mnesia:dirty_delete(rabbit_disk_queue,
+ {Q, SeqId});
MnesiaDelete =:= txn ->
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
+ mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
true -> ok
end,
Files3
@@ -735,8 +776,8 @@ internal_tx_publish(MsgId, MsgBody,
true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
ContiguousTop1, Left, undefined}),
NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
- maybe_roll_to_new_file(NextOffset,
- State #dqstate {current_offset = NextOffset});
+ maybe_roll_to_new_file(
+ NextOffset, State #dqstate {current_offset = NextOffset});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
%% We already know about it, just update counter
ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
@@ -753,49 +794,50 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
{PubList, PubAcc, ReadSeqId, Length} =
case PubMsgSeqIds of
[] -> {[], undefined, undefined, undefined};
- [{_, FirstSeqIdTo}|PubMsgSeqIdsTail] ->
+ [{_, FirstSeqIdTo}|_] ->
{InitReadSeqId, InitWriteSeqId, InitLength} =
- case ets:lookup(Sequences, Q) of
- [] -> {0,0,0};
- [{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
- {ReadSeqId2, WriteSeqId2, Length2}
- end,
- InitReadSeqId2 = determine_next_read_id(InitReadSeqId, InitWriteSeqId, FirstSeqIdTo),
- { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
+ sequence_lookup(Sequences, Q),
+ InitReadSeqId2 = determine_next_read_id(
+ InitReadSeqId, InitWriteSeqId, FirstSeqIdTo),
+ { zip_with_tail(PubMsgSeqIds, {last, {next, next}}),
InitWriteSeqId, InitReadSeqId2, InitLength}
end,
{atomic, {Sync, WriteSeqId, State2}} =
mnesia:transaction(
- fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- %% must deal with publishes first, if we didn't
- %% then we could end up acking a message before
- %% it's been published, which is clearly
- %% nonsense. I.e. in commit, do not do things in an
- %% order which _could_not_ have happened.
- {Sync2, WriteSeqId3} =
- lists:foldl(
- fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
- {Acc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
- dets_ets_lookup(State, MsgId),
- SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write),
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ %% must deal with publishes first, if we didn't
+ %% then we could end up acking a message before
+ %% it's been published, which is clearly
+ %% nonsense. I.e. in commit, do not do things in an
+ %% order which _could_not_ have happened.
+ {Sync2, WriteSeqId3} =
+ lists:foldl(
+ fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
+ {Acc, ExpectedSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset,
+ _TotalSize}] = dets_ets_lookup(State, MsgId),
+ SeqId2 = adjust_last_msg_seq_id(
+ Q, ExpectedSeqId, SeqId, write),
NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId),
- ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id =
- {Q, SeqId2},
- msg_id = MsgId,
- is_delivered = false,
- next_seq_id = NextSeqId2
- },
- write),
- {Acc or (CurName =:= File), NextSeqId2}
+ ok = mnesia:write(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id =
+ {Q, SeqId2},
+ msg_id = MsgId,
+ is_delivered = false,
+ next_seq_id = NextSeqId2
+ },
+ write),
+ {Acc orelse (CurName =:= File), NextSeqId2}
end, {false, PubAcc}, PubList),
{ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
{Sync2, WriteSeqId3, State3}
end),
true = if PubList =:= [] -> true;
- true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId, Length + erlang:length(PubList)})
+ true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId,
+ Length + erlang:length(PubList)})
end,
ok = if Sync -> file:sync(CurHdl);
true -> ok
@@ -807,12 +849,7 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(MsgId, MsgBody, State),
{ReadSeqId, WriteSeqId, Length} =
- case ets:lookup(Sequences, Q) of
- [] -> %% previously unseen queue
- {0, 0, 0};
- [{Q, ReadSeqId2, WriteSeqId2, Length2}] ->
- {ReadSeqId2, WriteSeqId2, Length2}
- end,
+ sequence_lookup(Sequences, Q),
ReadSeqId3 = determine_next_read_id(ReadSeqId, WriteSeqId, SeqId),
WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId, dirty),
WriteSeqId3Next = WriteSeqId3 + 1,
@@ -827,12 +864,12 @@ internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
internal_tx_cancel(MsgIds, State) ->
%% we don't need seq ids because we're not touching mnesia,
%% because seqids were never assigned
- MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)),
+ MsgSeqIds = zip_with_tail(MsgIds, {duplicate, undefined}),
remove_messages(undefined, MsgSeqIds, false, State).
internal_requeue(_Q, [], State) ->
{ok, State};
-internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
+internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|_],
State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
@@ -856,34 +893,39 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
%% MsgLocation and FileSummary stay put (which makes further sense
%% as they have no concept of sequence id anyway).
- %% the Q _must_ already exist
- [{Q, ReadSeqId, WriteSeqId, Length}] = ets:lookup(Sequences, Q),
+ {ReadSeqId, WriteSeqId, Length} = sequence_lookup(Sequences, Q),
ReadSeqId2 = determine_next_read_id(ReadSeqId, WriteSeqId, FirstSeqIdTo),
- MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
- {atomic, WriteSeqId2} =
+ MsgSeqIdsZipped = zip_with_tail(MsgSeqIds, {last, {next, next}}),
+ {atomic, {WriteSeqId2, Q}} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun ({{{MsgId, SeqIdOrig}, SeqIdTo},
- {_NextMsgSeqId, NextSeqIdTo}},
- ExpectedSeqIdTo) ->
- SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
- NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
- [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
- mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2},
- next_seq_id = NextSeqIdTo2
- },
- write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write),
- NextSeqIdTo2
- end, WriteSeqId, MsgSeqIdsZipped)
+ lists:foldl(fun requeue_message/2, {WriteSeqId, Q},
+ MsgSeqIdsZipped)
end),
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2, Length + erlang:length(MsgSeqIds)}),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId2,
+ Length + erlang:length(MsgSeqIds)}),
{ok, State}.
+requeue_message({{{MsgId, SeqIdOrig}, SeqIdTo},
+ {_NextMsgSeqId, NextSeqIdTo}},
+ {ExpectedSeqIdTo, Q}) ->
+ SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
+ NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
+ [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId,
+ next_seq_id = NextSeqIdOrig }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
+ if SeqIdTo2 == SeqIdOrig andalso NextSeqIdTo2 == NextSeqIdOrig -> ok;
+ true ->
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc {queue_and_seq_id = {Q, SeqIdTo2},
+ next_seq_id = NextSeqIdTo2
+ },
+ write),
+ ok = mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write)
+ end,
+ {NextSeqIdTo2, Q}.
+
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
[] -> {ok, 0, State};
@@ -898,7 +940,8 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
(SeqId) ->
[#dq_msg_loc { msg_id = MsgId,
next_seq_id = NextSeqId }
- ] = mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ ] = mnesia:read(rabbit_disk_queue,
+ {Q, SeqId}, write),
{true, {MsgId, SeqId}, NextSeqId}
end, ReadSeqId),
remove_messages(Q, MsgSeqIds, txn, State)
@@ -908,23 +951,27 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
end.
internal_delete_queue(Q, State) ->
- {ok, _Count, State1 = #dqstate { sequences = Sequences }} = internal_purge(Q, State),
+ {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
+ internal_purge(Q, State), %% remove everything undelivered
true = ets:delete(Sequences, Q),
{atomic, {ok, State2}} =
mnesia:transaction(
- fun() ->
+ fun() -> %% now remove everything already delivered
ok = mnesia:write_lock_table(rabbit_disk_queue),
Objs =
- mnesia:match_object(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'},
- msg_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- }, write),
+ mnesia:match_object(
+ rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ write),
MsgSeqIds =
lists:map(
- fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) ->
- {MsgId, SeqId}
- end, Objs),
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
+ msg_id = MsgId }) ->
+ {MsgId, SeqId} end, Objs),
remove_messages(Q, MsgSeqIds, txn, State1)
end),
{ok, State2}.
@@ -938,26 +985,25 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
fun ({SeqId, _State1}) when SeqId == WriteSeq ->
false;
({SeqId, State1}) ->
- {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, NextReadSeqId, State2} =
- internal_read_message(Q, SeqId, true, true, State1),
- {true, {MsgId, Msg, Size, Delivered, SeqId}, {NextReadSeqId, State2}}
+ {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}},
+ NextReadSeqId, State2} =
+ internal_read_message(Q, SeqId, true, true,
+ State1),
+ {true, {MsgId, Msg, Size, Delivered, SeqId},
+ {NextReadSeqId, State2}}
end, {ReadSeq, State}),
{lists:reverse(QList), State3}
end.
-internal_delete_non_durable_queues(DurableQueues, State = #dqstate { sequences = Sequences }) ->
- State3 =
- ets:foldl(
- fun ({Q, _Read, _Write, _Length}, State1) ->
- case sets:is_element(Q, DurableQueues) of
- true ->
- State1;
- false ->
- {ok, State2} = internal_delete_queue(Q, State1),
- State2
- end
- end, State, Sequences),
- {ok, State3}.
+internal_delete_non_durable_queues(
+ DurableQueues, State = #dqstate { sequences = Sequences }) ->
+ ets:foldl(
+ fun ({Q, _Read, _Write, _Length}, {ok, State1}) ->
+ case sets:is_element(Q, DurableQueues) of
+ true -> {ok, State1};
+ false -> internal_delete_queue(Q, State1)
+ end
+ end, {ok, State}, Sequences).
%% ---- ROLLING OVER THE APPEND FILE ----
@@ -975,10 +1021,8 @@ maybe_roll_to_new_file(Offset,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
{ok, NextHdl} = file:open(form_filename(NextName),
[write, raw, binary, delayed_write]),
- {ok, FileSizeLimit} = file:position(NextHdl, {bof, FileSizeLimit}),
- ok = file:truncate(NextHdl),
- {ok, 0} = file:position(NextHdl, {bof, 0}),
- true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right
+ ok = preallocate(NextHdl, FileSizeLimit, 0),
+ true = ets:update_element(FileSummary, CurName, {5, NextName}),%% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
State1 = State #dqstate { current_file_name = NextName,
current_file_handle = NextHdl,
@@ -989,6 +1033,12 @@ maybe_roll_to_new_file(Offset,
maybe_roll_to_new_file(_, State) ->
{ok, State}.
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file:position(Hdl, {bof, FileSizeLimit}),
+ ok = file:truncate(Hdl),
+ {ok, FinalPos} = file:position(Hdl, {bof, FinalPos}),
+ ok.
+
%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
compact(FilesSet, State) ->
@@ -1000,70 +1050,59 @@ compact(FilesSet, State) ->
end, [], Files),
lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
-combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+combine_file(File, State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
%% the file we're looking at may no longer exist as it may have
%% been deleted within the current GC run
case ets:lookup(FileSummary, File) of
[] -> State;
- [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
+ [FileObj = {File, _ValidData, _ContiguousTop, Left, Right}] ->
GoRight =
fun() ->
case Right of
undefined -> State;
- _ when not(CurName =:= Right) ->
- [RightObj = {Right, RightValidData,
- _RightContiguousTop, File, RightRight}] =
- ets:lookup(FileSummary, Right),
- RightSumData = ValidData + RightValidData,
- if FileSizeLimit >= RightSumData ->
- %% here, Right will be the source and so will be deleted,
- %% File will be the destination
- State1 = combine_files(RightObj, FileObj,
- State),
- %% this could fail if RightRight is undefined
- %% left is the 4th field
- ets:update_element(FileSummary,
- RightRight, {4, File}),
- true = ets:insert(FileSummary, {File,
- RightSumData,
- RightSumData,
- Left,
- RightRight}),
- true = ets:delete(FileSummary, Right),
- State1;
- true -> State
- end;
+ _ when not (CurName == Right) ->
+ [RightObj] = ets:lookup(FileSummary, Right),
+ {_, State1} =
+ adjust_meta_and_combine(FileObj, RightObj,
+ State),
+ State1;
_ -> State
end
end,
case Left of
undefined ->
GoRight();
- _ -> [LeftObj =
- {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] =
- ets:lookup(FileSummary, Left),
- LeftSumData = ValidData + LeftValidData,
- if FileSizeLimit >= LeftSumData ->
- %% here, File will be the source and so will be deleted,
- %% Left will be the destination
- State1 = combine_files(FileObj, LeftObj, State),
- %% this could fail if Right is undefined
- %% left is the 4th field
- ets:update_element(FileSummary, Right, {4, Left}),
- true = ets:insert(FileSummary, {Left, LeftSumData,
- LeftSumData,
- LeftLeft, Right}),
- true = ets:delete(FileSummary, File),
- State1;
- true ->
- GoRight()
+ _ -> [LeftObj] = ets:lookup(FileSummary, Left),
+ case adjust_meta_and_combine(LeftObj, FileObj, State) of
+ {true, State1} -> State1;
+ {false, State} -> GoRight()
end
end
end.
+adjust_meta_and_combine(
+ LeftObj = {LeftFile, LeftValidData, _LeftContigTop, LeftLeft, RightFile},
+ RightObj = {RightFile, RightValidData, _RightContigTop, LeftFile, RightRight},
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ file_summary = FileSummary
+ }) ->
+ TotalValidData = LeftValidData + RightValidData,
+ if FileSizeLimit >= TotalValidData ->
+ State1 = combine_files(RightObj, LeftObj, State),
+ %% this could fail if RightRight is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary, RightRight, {4, LeftFile}),
+ true = ets:insert(FileSummary, {LeftFile,
+ TotalValidData, TotalValidData,
+ LeftLeft,
+ RightRight}),
+ true = ets:delete(FileSummary, RightFile),
+ {true, State1};
+ true -> {false, State}
+ end.
+
sort_msg_locations_by_offset(Asc, List) ->
Comp = if Asc -> fun erlang:'<'/2;
true -> fun erlang:'>'/2
@@ -1093,92 +1132,72 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
file:open(form_filename(Destination),
[read, write, raw, binary, read_ahead, delayed_write]),
ExpectedSize = SourceValid + DestinationValid,
- %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file
- %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file
- %% then truncate, copy back in, and then copy over from Source
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
if DestinationContiguousTop =:= DestinationValid ->
ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
+ DestinationValid, ExpectedSize);
true ->
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} =
file:open(form_filename(Tmp),
- [read, write, raw, binary, read_ahead, delayed_write]),
+ [read, write, raw, binary,
+ read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
fun ({_, _, _, Offset, _})
when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset == DestinationContiguousTop
- %% because if it was then DestinationContiguousTop would have been
- %% extended by TotalSize
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect that the list should be
- %% naturally sorted as we require, however, we need to enforce it anyway
- end, sort_msg_locations_by_offset(true,
- dets_ets_match_object(State,
- {'_', '_',
- Destination,
- '_', '_'}))),
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end, sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Destination,
+ '_', '_'}))),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
TmpSize = DestinationValid - DestinationContiguousTop,
- {TmpSize, BlockStart1, BlockEnd1} =
- lists:foldl(
- fun ({MsgId, RefCount, _Destination, Offset, TotalSize},
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the TmpFile.
- %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% this message is going to end up back in
- %% Destination, at DestinationContiguousTop
- %% + CurOffset
- FinalOffset = DestinationContiguousTop + CurOffset,
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- FinalOffset, TotalSize}),
- NextOffset = CurOffset + Size,
- if BlockStart =:= undefined ->
- %% base case, called only for the
- %% first list elem
- {NextOffset, Offset, Offset + Size};
- Offset =:= BlockEnd ->
- %% extend the current block because
- %% the next msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
- true ->
- %% found a gap, so actually do the
- %% work for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file:position(DestinationHdl,
- {bof, BlockStart}),
- {ok, BSize} = file:copy(DestinationHdl,
- TmpHdl, BSize),
- {NextOffset, Offset, Offset + Size}
- end
- end, {0, undefined, undefined}, Worklist),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
- {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
%% so now Tmp contains everything we need to salvage from
%% Destination, and MsgLocationDets has been updated to
%% reflect compaction of Destination so truncate
%% Destination and copy from Tmp back to the end
{ok, 0} = file:position(TmpHdl, {bof, 0}),
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationContiguousTop, ExpectedSize),
+ ok = truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
{ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be
- %% DestinationValid
+ %% position in DestinationHdl should now be DestinationValid
ok = file:sync(DestinationHdl),
ok = file:close(TmpHdl),
ok = file:delete(form_filename(Tmp))
end,
SourceWorkList =
- sort_msg_locations_by_offset(true,
- dets_ets_match_object(State,
- {'_', '_', Source,
- '_', '_'})),
- {ExpectedSize, BlockStart2, BlockEnd2} =
+ sort_msg_locations_by_offset(
+ true, dets_ets_match_object(State,
+ {'_', '_', Source,
+ '_', '_'})),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file:sync(DestinationHdl),
+ ok = file:close(SourceHdl),
+ ok = file:close(DestinationHdl),
+ ok = file:delete(form_filename(Source)),
+ State.
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, State) ->
+ {FinalOffset, BlockStart2, BlockEnd2} =
lists:foldl(
fun ({MsgId, RefCount, _Source, Offset, TotalSize},
{CurOffset, BlockStart, BlockEnd}) ->
@@ -1190,8 +1209,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
CurOffset, TotalSize}),
NextOffset = CurOffset + Size,
if BlockStart =:= undefined ->
- %% base case, called only for the first list
- %% elem
+ %% base case, called only for the first list elem
{NextOffset, Offset, Offset + Size};
Offset =:= BlockEnd ->
%% extend the current block because the next
@@ -1207,17 +1225,12 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
file:copy(SourceHdl, DestinationHdl, BSize),
{NextOffset, Offset, Offset + Size}
end
- end, {DestinationValid, undefined, undefined}, SourceWorkList),
+ end, {InitOffset, undefined, undefined}, WorkList),
%% do the last remaining block
BSize2 = BlockEnd2 - BlockStart2,
{ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
{ok, BSize2} = file:copy(SourceHdl, DestinationHdl, BSize2),
- %% tidy up
- ok = file:sync(DestinationHdl),
- ok = file:close(SourceHdl),
- ok = file:close(DestinationHdl),
- ok = file:delete(form_filename(Source)),
- State.
+ ok.
close_file(File, State = #dqstate { read_file_handles =
{ReadHdls, ReadHdlsAge} }) ->
@@ -1237,20 +1250,22 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
case ValidData of
%% we should NEVER find the current file in here hence right
%% should always be a file, not undefined
- 0 -> case {Left, Right} of
- {undefined, _} when not(is_atom(Right)) ->
- %% the eldest file is empty. YAY!
- %% left is the 4th field
- true = ets:update_element(FileSummary, Right, {4, undefined});
- {_, _} when not(is_atom(Right)) ->
- %% left is the 4th field
- true = ets:update_element(FileSummary, Right, {4, Left}),
- %% right is the 5th field
- true = ets:update_element(FileSummary, Left, {5, Right})
- end,
- true = ets:delete(FileSummary, File),
- ok = file:delete(form_filename(File)),
- Acc;
+ 0 ->
+ case {Left, Right} of
+ {undefined, _} when not (is_atom(Right)) ->
+ %% the eldest file is empty. YAY!
+ %% left is the 4th field
+ true =
+ ets:update_element(FileSummary, Right, {4, undefined});
+ {_, _} when not (is_atom(Right)) ->
+ %% left is the 4th field
+ true = ets:update_element(FileSummary, Right, {4, Left}),
+ %% right is the 5th field
+ true = ets:update_element(FileSummary, Left, {5, Right})
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(File)),
+ Acc;
_ -> [File|Acc]
end.
@@ -1268,7 +1283,7 @@ del_index() ->
{atomic, ok} -> ok;
%% hmm, something weird must be going on, but it's probably
%% not the end of the world
- {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
+ {aborted, {no_exists, rabbit_disk_queue,_}} -> ok;
E2 -> E2
end.
@@ -1286,13 +1301,18 @@ load_from_disk(State) ->
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }, true) ->
- case erlang:length(dets_ets_lookup(State1, MsgId)) of
- 0 -> ok == mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
- 1 -> true
- end
- end,
- true, rabbit_disk_queue)
+ mnesia:foldl(
+ fun (#dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = {Q, SeqId} },
+ true) ->
+ case erlang:length(dets_ets_lookup(
+ State1, MsgId)) of
+ 0 -> ok == mnesia:delete(rabbit_disk_queue,
+ {Q, SeqId}, write);
+ 1 -> true
+ end
+ end,
+ true, rabbit_disk_queue)
end),
State2 = extract_sequence_numbers(State1),
ok = del_index(),
@@ -1306,9 +1326,9 @@ extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
NextWrite = SeqId + 1,
case ets:lookup(Sequences, Q) of
- [] ->
- true = ets:insert_new(Sequences,
- {Q, SeqId, NextWrite, -1});
+ [] -> true =
+ ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite, -1});
[Orig = {Q, Read, Write, Length}] ->
Repl = {Q, lists:min([Read, SeqId]),
%% Length is wrong here, but
@@ -1345,10 +1365,12 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foreach(
fun ({Q, ReadSeqId, WriteSeqId, _Length}) ->
- Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0),
+ Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
ReadSeqId2 = ReadSeqId + Gap,
Length = WriteSeqId - ReadSeqId2,
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId, Length})
+ true =
+ ets:insert(Sequences,
+ {Q, ReadSeqId2, WriteSeqId, Length})
end, ets:match_object(Sequences, '_'))
end).
@@ -1361,8 +1383,9 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
[Obj] ->
if Gap =:= 0 -> ok;
true -> mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap },
- next_seq_id = SeqId + Gap + 1
+ Obj #dq_msg_loc {
+ queue_and_seq_id = {Q, SeqId + Gap },
+ next_seq_id = SeqId + Gap + 1
},
write),
mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
@@ -1371,8 +1394,9 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
-load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName }) ->
+load_messages(undefined, [],
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
State;
load_messages(Left, [], State) ->
@@ -1401,8 +1425,9 @@ load_messages(Left, [File|Files],
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
- true = dets_ets_insert_new(State, {MsgId, RefCount, File,
- Offset, TotalSize}),
+ true =
+ dets_ets_insert_new(State, {MsgId, RefCount, File,
+ Offset, TotalSize}),
{[{MsgId, TotalSize, Offset}|VMAcc],
VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
}
@@ -1410,21 +1435,37 @@ load_messages(Left, [File|Files],
end, {[], 0}, Messages),
%% foldl reverses lists and find_contiguous_block_prefix needs
%% elems in the same order as from scan_file_for_valid_messages
- {ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)),
+ {ContiguousTop, _} = find_contiguous_block_prefix(
+ lists:reverse(ValidMessagesRev)),
Right = case Files of
[] -> undefined;
[F|_] -> F
end,
- true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}),
+ true = ets:insert_new(FileSummary,
+ {File, ValidTotalSize, ContiguousTop, Left, Right}),
load_messages(File, Files, State).
%% ---- DISK RECOVERY OF FAILED COMPACTION ----
recover_crashed_compactions(Files, TmpFiles) ->
- lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end,
+ lists:foreach(fun (TmpFile) ->
+ ok = recover_crashed_compactions1(Files, TmpFile) end,
TmpFiles),
ok.
+verify_messages_in_mnesia(MsgIds) ->
+ lists:foreach(
+ fun (MsgId) ->
+ true = 0 < erlang:length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id))
+ end, MsgIds).
+
recover_crashed_compactions1(Files, TmpFile) ->
GrabMsgId = fun ({MsgId, _TotalSize, _FileOffset}) -> MsgId end,
NonTmpRelatedFile = filename:rootname(TmpFile) ++ ?FILE_EXTENSION,
@@ -1435,37 +1476,35 @@ recover_crashed_compactions1(Files, TmpFile) ->
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
- lists:foreach(fun (MsgId) ->
- true = 0 < erlang:length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- },
- msg_id))
- end, MsgIdsTmp),
+ verify_messages_in_mnesia(MsgIdsTmp),
{ok, UncorruptedMessages} =
scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
- %% 1) It's possible that everything in the tmp file is also in the main file
- %% such that the main file is (prefix ++ tmpfile). This means that compaction
- %% failed immediately prior to the final step of deleting the tmp file.
- %% Plan: just delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the main file
- %% but with holes throughout (or just somthing like main = (prefix ++ hole ++ tmpfile)).
- %% This means that compaction wrote out the tmp file successfully and then failed.
- %% Plan: just delete the tmp file and allow the compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the main file
- %% but such that the main file does not end with tmp file (and there are valid messages
- %% in the suffix; main = (prefix ++ tmpfile[with extra holes?] ++ suffix)).
- %% This means that compaction failed as we were writing out the tmp file.
- %% Plan: just delete the tmp file and allow the compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which are not in the main file.
- %% This means that writing out the tmp file succeeded, but then we failed as we
- %% were copying them back over to the main file, after truncating the main file.
- %% As the main file has already been truncated, it should consist only of valid messages
- %% Plan: Truncate the main file back to before any of the files in the tmp file and copy
+ %% 1) It's possible that everything in the tmp file is also in the
+ %% main file such that the main file is (prefix ++
+ %% tmpfile). This means that compaction failed immediately
+ %% prior to the final step of deleting the tmp file. Plan: just
+ %% delete the tmp file
+ %% 2) It's possible that everything in the tmp file is also in the
+ %% main file but with holes throughout (or just somthing like
+ %% main = (prefix ++ hole ++ tmpfile)). This means that
+ %% compaction wrote out the tmp file successfully and then
+ %% failed. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 3) It's possible that everything in the tmp file is also in the
+ %% main file but such that the main file does not end with tmp
+ %% file (and there are valid messages in the suffix; main =
+ %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
+ %% means that compaction failed as we were writing out the tmp
+ %% file. Plan: just delete the tmp file and allow the
+ %% compaction to eventually be triggered later
+ %% 4) It's possible that there are messages in the tmp file which
+ %% are not in the main file. This means that writing out the
+ %% tmp file succeeded, but then we failed as we were copying
+ %% them back over to the main file, after truncating the main
+ %% file. As the main file has already been truncated, it should
+ %% consist only of valid messages. Plan: Truncate the main file
+ %% back to before any of the files in the tmp file and copy
%% them over again
case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
@@ -1473,29 +1512,21 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% is empty
ok = file:delete(TmpFile);
_False ->
- %% we're in case 4 above.
- %% check that everything in the main file is a valid message in mnesia
- lists:foreach(fun (MsgId) ->
- true = 0 < erlang:length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- },
- msg_id))
- end, MsgIds),
+ %% we're in case 4 above. Check that everything in the
+ %% main file is a valid message in mnesia
+ verify_messages_in_mnesia(MsgIds),
%% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
%% we should have that none of the messages in the prefix
%% are in the tmp file
- true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end,
- MsgIds),
-
+ true = lists:all(fun (MsgId) ->
+ not (lists:member(MsgId, MsgIdsTmp))
+ end, MsgIds),
{ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
[write, raw, binary, delayed_write]),
{ok, Top} = file:position(MainHdl, Top),
- ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file
+ %% wipe out any rubbish at the end of the file
+ ok = file:truncate(MainHdl),
%% there really could be rubbish at the end of the file -
%% we could have failed after the extending truncate.
%% Remember the head of the list will be the highest entry
@@ -1596,7 +1627,8 @@ read_message_at_offset(FileHdl, Offset, TotalSize) ->
scan_file_for_valid_messages(File) ->
{ok, Hdl} = file:open(File, [raw, binary, read]),
Valid = scan_file_for_valid_messages(Hdl, 0, []),
- _ = file:close(Hdl), %% if something really bad's happened, the close could fail, but ignore
+ %% if something really bad's happened, the close could fail, but ignore
+ file:close(Hdl),
Valid.
scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
@@ -1607,21 +1639,28 @@ scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
{ok, {ok, MsgId, TotalSize, NextOffset}} ->
scan_file_for_valid_messages(FileHdl, NextOffset,
[{MsgId, TotalSize, Offset}|Acc]);
- _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages
+ _KO ->
+ %% bad message, but we may still have recovered some valid messages
+ {ok, Acc}
end.
read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ {ok,
+ <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
{true, _} -> {ok, eof}; %% Nothing we can do other than stop
- {false, true} -> %% current message corrupted, try skipping past it
- ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
+ {false, true} ->
+ %% current message corrupted, try skipping past it
+ ExpectedAbsPos =
+ Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
case file:position(FileHdl, {cur, TotalSize + 1}) of
- {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}};
- {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
+ {ok, ExpectedAbsPos} ->
+ {ok, {corrupted, ExpectedAbsPos}};
+ {ok, _SomeOtherPos} ->
+ {ok, eof}; %% seek failed, so give up
KO -> KO
end;
{false, false} -> %% all good, let's continue
@@ -1629,19 +1668,23 @@ read_next_file_entry(FileHdl, Offset) ->
{ok, <<MsgId:MsgIdBinSize/binary>>} ->
ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
case file:position(FileHdl,
- {cur, TotalSize - MsgIdBinSize}) of
+ {cur, TotalSize - MsgIdBinSize}
+ ) of
{ok, ExpectedAbsPos} ->
NextOffset = Offset + TotalSize +
?FILE_PACKING_ADJUSTMENT,
case file:read(FileHdl, 1) of
- {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
+ {ok,
+ <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
{ok, {ok, binary_to_term(MsgId),
TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{ok, {corrupted, NextOffset}};
KO -> KO
end;
- {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
+ {ok, _SomeOtherPos} ->
+ %% seek failed, so give up
+ {ok, eof};
KO -> KO
end;
eof -> {ok, eof};