summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-21 11:37:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-21 11:37:02 +0100
commit9c18c3d9d2809bc419b50d6eb0f3a64aee310a40 (patch)
tree5dcfefabbabad8d76f8ea7f10077d943342078eb /src
parente322ba099c7db411dbe1c39c2d9f6496f2968c15 (diff)
downloadrabbitmq-server-git-9c18c3d9d2809bc419b50d6eb0f3a64aee310a40.tar.gz
Formatting only. Only just realised emacs was using tabs. Fixed.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl1432
-rw-r--r--src/rabbit_mixed_queue.erl123
-rw-r--r--src/rabbit_tests.erl126
3 files changed, 845 insertions, 836 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 37c91a855b..8c602b5310 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -196,7 +196,7 @@
%% +-------+ +-------+ +-------+
%% | B | | X | | B |
%% +-------+ +-------+ +-------+
-%% | A | | E | | A |
+%% | A | | E | | A |
%% +-------+ +-------+ +-------+
%% left right left
%%
@@ -224,19 +224,19 @@
-type(seq_id() :: non_neg_integer()).
-spec(start_link/1 :: (non_neg_integer()) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
- {'empty' | {msg_id(), binary(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}}).
+ {'empty' | {msg_id(), binary(), non_neg_integer(),
+ bool(), {msg_id(), seq_id()}}}).
-spec(phantom_deliver/1 :: (queue_name()) ->
- { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
+ { 'empty' | {msg_id(), bool(), {msg_id(), seq_id()}}}).
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}],
- [{msg_id(), seq_id()}]) -> 'ok').
+ [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok').
@@ -252,7 +252,7 @@
start_link(FileSizeLimit) ->
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [FileSizeLimit, ?MAX_READ_FILE_HANDLES], []).
+ [FileSizeLimit, ?MAX_READ_FILE_HANDLES], []).
publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
@@ -317,57 +317,57 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
Node = node(),
ok =
- case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok;
- E -> E
- end,
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node, disc_only_copies) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node, disc_only_copies}} -> ok;
+ E -> E
+ end,
ok = filelib:ensure_dir(form_filename("nothing")),
InitName = "0" ++ ?FILE_EXTENSION,
{ok, MsgLocationDets} =
- dets:open_file(?MSG_LOC_NAME,
- [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
- ?FILE_EXTENSION_DETS)},
- {min_no_slots, 1024*1024},
- %% man says this should be <= 32M. But it works...
- {max_no_slots, 1024*1024*1024},
- {type, set}
- ]),
+ dets:open_file(?MSG_LOC_NAME,
+ [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)},
+ {min_no_slots, 1024*1024},
+ %% man says this should be <= 32M. But it works...
+ {max_no_slots, 1024*1024*1024},
+ {type, set}
+ ]),
%% it would be better to have this as private, but dets:from_ets/2
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
State =
- #dqstate { msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
- operation_mode = disk_only,
- file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [set, private]),
- sequences = ets:new(?SEQUENCE_ETS_NAME,
- [set, private]),
- current_file_num = 0,
- current_file_name = InitName,
- current_file_handle = undefined,
- current_offset = 0,
- file_size_limit = FileSizeLimit,
- read_file_handles = {dict:new(), gb_trees:empty()},
- read_file_handles_limit = ReadFileHandlesLimit
- },
+ #dqstate { msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts,
+ operation_mode = disk_only,
+ file_summary = ets:new(?FILE_SUMMARY_ETS_NAME,
+ [set, private]),
+ sequences = ets:new(?SEQUENCE_ETS_NAME,
+ [set, private]),
+ current_file_num = 0,
+ current_file_name = InitName,
+ current_file_handle = undefined,
+ current_offset = 0,
+ file_size_limit = FileSizeLimit,
+ read_file_handles = {dict:new(), gb_trees:empty()},
+ read_file_handles_limit = ReadFileHandlesLimit
+ },
{ok, State1 = #dqstate { current_file_name = CurrentName,
- current_offset = Offset } } =
- load_from_disk(State),
+ current_offset = Offset } } =
+ load_from_disk(State),
Path = form_filename(CurrentName),
Exists = case file:read_file_info(Path) of
- {error,enoent} -> false;
- {ok, _} -> true
- end,
+ {error,enoent} -> false;
+ {ok, _} -> true
+ end,
%% read is only needed so that we can seek
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
ok = if Exists -> ok;
- true -> %% new file, so preallocate
- {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}),
- file:truncate(FileHdl)
- end,
+ true -> %% new file, so preallocate
+ {ok, FileSizeLimit} = file:position(FileHdl, {bof, FileSizeLimit}),
+ file:truncate(FileHdl)
+ end,
{ok, Offset} = file:position(FileHdl, {bof, Offset}),
{ok, State1 #dqstate { current_file_handle = FileHdl }}.
@@ -391,21 +391,21 @@ handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
State1 = #dqstate { file_summary = FileSummary,
- sequences = Sequences } =
- shutdown(State), %% tidy up file handles early
+ sequences = Sequences } =
+ shutdown(State), %% tidy up file handles early
{atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
true = ets:delete(FileSummary),
true = ets:delete(Sequences),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok,
State1 #dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}};
+ read_file_handles = {dict:new(), gb_trees:empty()}}};
%% gen_server now calls terminate, which then calls shutdown
handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = disk_only }) ->
{reply, ok, State};
handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_disk,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_only_copies),
ok = dets:from_ets(MsgLocationDets, MsgLocationEts),
true = ets:delete_all_objects(MsgLocationEts),
@@ -413,8 +413,8 @@ handle_call(to_disk_only_mode, _From, State = #dqstate { operation_mode = ram_di
handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = ram_disk }) ->
{reply, ok, State};
handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_only,
- msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts }) ->
+ msg_location_dets = MsgLocationDets,
+ msg_location_ets = MsgLocationEts }) ->
{atomic, ok} = mnesia:change_table_copy_type(rabbit_disk_queue, node(), disc_copies),
true = ets:from_dets(MsgLocationEts, MsgLocationDets),
ok = dets:delete_all_objects(MsgLocationDets),
@@ -450,24 +450,24 @@ terminate(_Reason, State) ->
shutdown(State).
shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
- msg_location_ets = MsgLocationEts,
- current_file_handle = FileHdl,
- read_file_handles = {ReadHdls, _ReadHdlsAge}
- }) ->
+ msg_location_ets = MsgLocationEts,
+ current_file_handle = FileHdl,
+ read_file_handles = {ReadHdls, _ReadHdlsAge}
+ }) ->
%% deliberately ignoring return codes here
dets:close(MsgLocationDets),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
- ?FILE_EXTENSION_DETS)),
+ ?FILE_EXTENSION_DETS)),
true = ets:delete_all_objects(MsgLocationEts),
if FileHdl =:= undefined -> ok;
true -> file:sync(FileHdl),
- file:close(FileHdl)
+ file:close(FileHdl)
end,
dict:fold(fun (_File, Hdl, _Acc) ->
- file:close(Hdl)
- end, ok, ReadHdls),
+ file:close(Hdl)
+ end, ok, ReadHdls),
State #dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}.
+ read_file_handles = {dict:new(), gb_trees:empty()}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -481,97 +481,97 @@ base_directory() ->
filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
- Key) ->
+ Key) ->
dets:lookup(MsgLocationDets, Key);
dets_ets_lookup(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
- Key) ->
+ Key) ->
ets:lookup(MsgLocationEts, Key).
dets_ets_delete(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
- Key) ->
+ Key) ->
ok = dets:delete(MsgLocationDets, Key);
dets_ets_delete(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
- Key) ->
+ Key) ->
true = ets:delete(MsgLocationEts, Key),
ok.
dets_ets_insert(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
- Obj) ->
+ Obj) ->
ok = dets:insert(MsgLocationDets, Obj);
dets_ets_insert(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
- Obj) ->
+ Obj) ->
true = ets:insert(MsgLocationEts, Obj),
ok.
dets_ets_insert_new(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
- Obj) ->
+ Obj) ->
true = dets:insert_new(MsgLocationDets, Obj);
dets_ets_insert_new(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
- Obj) ->
+ Obj) ->
true = ets:insert_new(MsgLocationEts, Obj).
dets_ets_match_object(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only },
- Obj) ->
+ Obj) ->
dets:match_object(MsgLocationDets, Obj);
dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk },
- Obj) ->
+ Obj) ->
ets:match_object(MsgLocationEts, Obj).
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
- [] -> {ok, empty, State};
- [{Q, ReadSeqId, WriteSeqId}] ->
- case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
- [] -> {ok, empty, State};
- [Obj =
- #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
- next_seq_id = ReadSeqId2}] ->
- [{MsgId, _RefCount, File, Offset, TotalSize}] =
- dets_ets_lookup(State, MsgId),
- true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}),
- ok =
- if Delivered -> ok;
- true ->
- mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
- if ReadMsg ->
- {FileHdl, State1} = get_read_handle(File, State),
- {ok, {MsgBody, BodySize}} =
- read_message_at_offset(FileHdl, Offset, TotalSize),
- {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
- State1};
- true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
- end
- end
+ [] -> {ok, empty, State};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
+ [] -> {ok, empty, State};
+ [Obj =
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
+ next_seq_id = ReadSeqId2}] ->
+ [{MsgId, _RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}),
+ ok =
+ if Delivered -> ok;
+ true ->
+ mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc {is_delivered = true})
+ end,
+ if ReadMsg ->
+ {FileHdl, State1} = get_read_handle(File, State),
+ {ok, {MsgBody, BodySize}} =
+ read_message_at_offset(FileHdl, Offset, TotalSize),
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
+ State1};
+ true -> {ok, {MsgId, Delivered, {MsgId, ReadSeqId}}, State}
+ end
+ end
end.
get_read_handle(File, State =
- #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
- read_file_handles_limit = ReadFileHandlesLimit }) ->
+ #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
+ read_file_handles_limit = ReadFileHandlesLimit }) ->
Now = now(),
{FileHdl, ReadHdls1, ReadHdlsAge1} =
- case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File),
- [read, raw, binary,
- read_ahead]),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
- {Then, OldFile, ReadHdlsAge2} =
- gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
- dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
- end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
- end,
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, ReadHdls, ReadHdlsAge};
+ _False ->
+ {Then, OldFile, ReadHdlsAge2} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ end;
+ {ok, {Hdl, Then}} ->
+ {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ end,
ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
{FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}.
@@ -585,74 +585,74 @@ internal_ack(Q, MsgSeqIds, State) ->
%% called from ack with MnesiaDelete = true
%% called from purge with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
- State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
Files =
- lists:foldl(
- fun ({MsgId, SeqId}, Files2) ->
- [{MsgId, RefCount, File, Offset, TotalSize}] =
- dets_ets_lookup(State, MsgId),
- Files3 =
- if 1 =:= RefCount ->
- ok = dets_ets_delete(State, MsgId),
- [{File, ValidTotalSize, ContiguousTop, Left, Right}] =
- ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- true = ets:insert(FileSummary,
- {File, (ValidTotalSize - TotalSize
- - ?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1, Left, Right}),
- if CurName =:= File -> Files2;
- true -> sets:add_element(File, Files2)
- end;
- 1 < RefCount ->
- ok = dets_ets_insert(State, {MsgId, RefCount - 1,
- File, Offset, TotalSize}),
- Files2
- end,
- ok = if MnesiaDelete ->
- mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
- MnesiaDelete =:= txn ->
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
- true -> ok
- end,
- Files3
- end, sets:new(), MsgSeqIds),
+ lists:foldl(
+ fun ({MsgId, SeqId}, Files2) ->
+ [{MsgId, RefCount, File, Offset, TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ Files3 =
+ if 1 =:= RefCount ->
+ ok = dets_ets_delete(State, MsgId),
+ [{File, ValidTotalSize, ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ true = ets:insert(FileSummary,
+ {File, (ValidTotalSize - TotalSize
+ - ?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
+ if CurName =:= File -> Files2;
+ true -> sets:add_element(File, Files2)
+ end;
+ 1 < RefCount ->
+ ok = dets_ets_insert(State, {MsgId, RefCount - 1,
+ File, Offset, TotalSize}),
+ Files2
+ end,
+ ok = if MnesiaDelete ->
+ mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId});
+ MnesiaDelete =:= txn ->
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
+ true -> ok
+ end,
+ Files3
+ end, sets:new(), MsgSeqIds),
State2 = compact(Files, State),
{ok, State2}.
internal_tx_publish(MsgId, MsgBody,
- State = #dqstate { current_file_handle = CurHdl,
- current_file_name = CurName,
- current_offset = CurOffset,
- file_summary = FileSummary
- }) ->
+ State = #dqstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ current_offset = CurOffset,
+ file_summary = FileSummary
+ }) ->
case dets_ets_lookup(State, MsgId) of
- [] ->
- %% New message, lots to do
- {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
- true = dets_ets_insert_new(State, {MsgId, 1, CurName,
- CurOffset, TotalSize}),
- [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
- ets:lookup(FileSummary, CurName),
- ValidTotalSize1 = ValidTotalSize + TotalSize +
- ?FILE_PACKING_ADJUSTMENT,
- ContiguousTop1 = if CurOffset =:= ContiguousTop ->
- %% can't be any holes in this file
- ValidTotalSize1;
- true -> ContiguousTop
- end,
- true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
- ContiguousTop1, Left, undefined}),
- NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
- maybe_roll_to_new_file(NextOffset,
- State #dqstate {current_offset = NextOffset});
- [{MsgId, RefCount, File, Offset, TotalSize}] ->
- %% We already know about it, just update counter
- ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
- Offset, TotalSize}),
- {ok, State}
+ [] ->
+ %% New message, lots to do
+ {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
+ true = dets_ets_insert_new(State, {MsgId, 1, CurName,
+ CurOffset, TotalSize}),
+ [{CurName, ValidTotalSize, ContiguousTop, Left, undefined}] =
+ ets:lookup(FileSummary, CurName),
+ ValidTotalSize1 = ValidTotalSize + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
+ %% can't be any holes in this file
+ ValidTotalSize1;
+ true -> ContiguousTop
+ end,
+ true = ets:insert(FileSummary, {CurName, ValidTotalSize1,
+ ContiguousTop1, Left, undefined}),
+ NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ maybe_roll_to_new_file(NextOffset,
+ State #dqstate {current_offset = NextOffset});
+ [{MsgId, RefCount, File, Offset, TotalSize}] ->
+ %% We already know about it, just update counter
+ ok = dets_ets_insert(State, {MsgId, RefCount + 1, File,
+ Offset, TotalSize}),
+ {ok, State}
end.
adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) ->
@@ -664,87 +664,87 @@ adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) ->
adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId ->
[Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
ok = mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
SuppliedSeqId.
%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
- State = #dqstate { current_file_handle = CurHdl,
- current_file_name = CurName,
- sequences = Sequences
- }) ->
+ State = #dqstate { current_file_handle = CurHdl,
+ current_file_name = CurName,
+ sequences = Sequences
+ }) ->
{PubList, PubAcc, ReadSeqId} =
- case PubMsgSeqIds of
- [] -> {[], undefined, undefined};
- [_|PubMsgSeqIdsTail] ->
- {InitReadSeqId, InitWriteSeqId} =
- case ets:lookup(Sequences, Q) of
- [] -> {0,0};
- [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
- end,
- { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
- InitWriteSeqId, InitReadSeqId}
- end,
+ case PubMsgSeqIds of
+ [] -> {[], undefined, undefined};
+ [_|PubMsgSeqIdsTail] ->
+ {InitReadSeqId, InitWriteSeqId} =
+ case ets:lookup(Sequences, Q) of
+ [] -> {0,0};
+ [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ end,
+ { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
+ InitWriteSeqId, InitReadSeqId}
+ end,
{atomic, {Sync, WriteSeqId, State2}} =
- mnesia:transaction(
- fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
- %% must deal with publishes first, if we didn't
- %% then we could end up acking a message before
- %% it's been published, which is clearly
- %% nonsense. I.e. in commit, do not do things in an
- %% order which _could_not_ have happened.
- {Sync2, WriteSeqId3} =
- lists:foldl(
- fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
- {Acc, ExpectedSeqId}) ->
- [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
- dets_ets_lookup(State, MsgId),
- SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId),
- NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1;
- true -> NextSeqId
- end,
- true = NextSeqId2 > SeqId2,
- ok = mnesia:write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id =
- {Q, SeqId2},
- msg_id = MsgId,
- is_delivered = false,
- next_seq_id = NextSeqId2
- },
- write),
- {Acc or (CurName =:= File), NextSeqId2}
- end, {false, PubAcc}, PubList),
-
- {ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
- {Sync2, WriteSeqId3, State3}
- end),
+ mnesia:transaction(
+ fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
+ %% must deal with publishes first, if we didn't
+ %% then we could end up acking a message before
+ %% it's been published, which is clearly
+ %% nonsense. I.e. in commit, do not do things in an
+ %% order which _could_not_ have happened.
+ {Sync2, WriteSeqId3} =
+ lists:foldl(
+ fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
+ {Acc, ExpectedSeqId}) ->
+ [{MsgId, _RefCount, File, _Offset, _TotalSize}] =
+ dets_ets_lookup(State, MsgId),
+ SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId),
+ NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1;
+ true -> NextSeqId
+ end,
+ true = NextSeqId2 > SeqId2,
+ ok = mnesia:write(rabbit_disk_queue,
+ #dq_msg_loc { queue_and_seq_id =
+ {Q, SeqId2},
+ msg_id = MsgId,
+ is_delivered = false,
+ next_seq_id = NextSeqId2
+ },
+ write),
+ {Acc or (CurName =:= File), NextSeqId2}
+ end, {false, PubAcc}, PubList),
+
+ {ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
+ {Sync2, WriteSeqId3, State3}
+ end),
true = if PubList =:= [] -> true;
- true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId})
- end,
+ true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId})
+ end,
ok = if Sync -> file:sync(CurHdl);
- true -> ok
- end,
+ true -> ok
+ end,
{ok, State2}.
%% SeqId can be 'next'
internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(MsgId, MsgBody, State),
+ internal_tx_publish(MsgId, MsgBody, State),
{ReadSeqId, WriteSeqId} =
- case ets:lookup(Sequences, Q) of
- [] -> %% previously unseen queue
- {0, 0};
- [{Q, ReadSeqId2, WriteSeqId2}] ->
- {ReadSeqId2, WriteSeqId2}
- end,
+ case ets:lookup(Sequences, Q) of
+ [] -> %% previously unseen queue
+ {0, 0};
+ [{Q, ReadSeqId2, WriteSeqId2}] ->
+ {ReadSeqId2, WriteSeqId2}
+ end,
WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId),
WriteSeqId3Next = WriteSeqId3 + 1,
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}),
ok = mnesia:dirty_write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
- msg_id = MsgId,
- next_seq_id = WriteSeqId3Next,
- is_delivered = false}),
+ #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
+ msg_id = MsgId,
+ next_seq_id = WriteSeqId3Next,
+ is_delivered = false}),
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
@@ -756,7 +756,7 @@ internal_tx_cancel(MsgIds, State) ->
internal_requeue(_Q, [], State) ->
{ok, State};
internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
- State = #dqstate { sequences = Sequences }) ->
+ State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
%% already been delivered). We also know that the rows for these
@@ -783,78 +783,78 @@ internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
[{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
{atomic, WriteSeqId2} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun ({{{MsgId, SeqIdOrig}, SeqIdTo},
- {_NextMsgSeqId, NextSeqIdTo}},
- ExpectedSeqIdTo) ->
- SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo),
- NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1;
- true -> NextSeqIdTo
- end,
- true = NextSeqIdTo2 > SeqIdTo2,
- [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
- mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2},
- next_seq_id = NextSeqIdTo2
- },
- write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write),
- NextSeqIdTo2
- end, WriteSeqId, MsgSeqIdsZipped)
- end),
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foldl(
+ fun ({{{MsgId, SeqIdOrig}, SeqIdTo},
+ {_NextMsgSeqId, NextSeqIdTo}},
+ ExpectedSeqIdTo) ->
+ SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo),
+ NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1;
+ true -> NextSeqIdTo
+ end,
+ true = NextSeqIdTo2 > SeqIdTo2,
+ [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
+ mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2},
+ next_seq_id = NextSeqIdTo2
+ },
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write),
+ NextSeqIdTo2
+ end, WriteSeqId, MsgSeqIdsZipped)
+ end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}),
{ok, State}.
internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
case ets:lookup(Sequences, Q) of
- [] -> {ok, 0, State};
- [{Q, ReadSeqId, WriteSeqId}] ->
- {atomic, {ok, State2}} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- MsgSeqIds = lists:foldl(
- fun (SeqId, Acc) ->
- [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
- [{MsgId, SeqId} | Acc]
- end, [], lists:seq(ReadSeqId, WriteSeqId - 1)),
- remove_messages(Q, MsgSeqIds, txn, State)
- end),
- true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
- {ok, WriteSeqId - ReadSeqId, State2}
+ [] -> {ok, 0, State};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ {atomic, {ok, State2}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ MsgSeqIds = lists:foldl(
+ fun (SeqId, Acc) ->
+ [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ [{MsgId, SeqId} | Acc]
+ end, [], lists:seq(ReadSeqId, WriteSeqId - 1)),
+ remove_messages(Q, MsgSeqIds, txn, State)
+ end),
+ true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, WriteSeqId - ReadSeqId, State2}
end.
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
- State = #dqstate { file_size_limit = FileSizeLimit,
- current_file_name = CurName,
- current_file_handle = CurHdl,
- current_file_num = CurNum,
- file_summary = FileSummary
- }
- ) when Offset >= FileSizeLimit ->
+ State = #dqstate { file_size_limit = FileSizeLimit,
+ current_file_name = CurName,
+ current_file_handle = CurHdl,
+ current_file_num = CurNum,
+ file_summary = FileSummary
+ }
+ ) when Offset >= FileSizeLimit ->
ok = file:sync(CurHdl),
ok = file:close(CurHdl),
NextNum = CurNum + 1,
NextName = integer_to_list(NextNum) ++ ?FILE_EXTENSION,
{ok, NextHdl} = file:open(form_filename(NextName),
- [write, raw, binary, delayed_write]),
+ [write, raw, binary, delayed_write]),
{ok, FileSizeLimit} = file:position(NextHdl, {bof, FileSizeLimit}),
ok = file:truncate(NextHdl),
{ok, 0} = file:position(NextHdl, {bof, 0}),
true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
State1 = State #dqstate { current_file_name = NextName,
- current_file_handle = NextHdl,
- current_file_num = NextNum,
- current_offset = 0
- },
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0
+ },
{ok, compact(sets:from_list([CurName]), State1)};
maybe_roll_to_new_file(_, State) ->
{ok, State}.
@@ -866,81 +866,81 @@ compact(FilesSet, State) ->
Files = lists:sort(sets:to_list(FilesSet)),
%% foldl reverses, so now youngest/right-most first
RemainingFiles = lists:foldl(fun (File, Acc) ->
- delete_empty_files(File, Acc, State)
- end, [], Files),
+ delete_empty_files(File, Acc, State)
+ end, [], Files),
lists:foldl(fun combine_file/2, State, lists:reverse(RemainingFiles)).
combine_file(File, State = #dqstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+ file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
%% the file we're looking at may no longer exist as it may have
%% been deleted within the current GC run
case ets:lookup(FileSummary, File) of
- [] -> State;
- [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
- GoRight =
- fun() ->
- case Right of
- undefined -> State;
- _ when not(CurName =:= Right) ->
- [RightObj = {Right, RightValidData,
- _RightContiguousTop, File, RightRight}] =
- ets:lookup(FileSummary, Right),
- RightSumData = ValidData + RightValidData,
- if FileSizeLimit >= RightSumData ->
- %% here, Right will be the source and so will be deleted,
- %% File will be the destination
- State1 = combine_files(RightObj, FileObj,
- State),
- %% this could fail if RightRight is undefined
- %% left is the 4th field
- ets:update_element(FileSummary,
- RightRight, {4, File}),
- true = ets:insert(FileSummary, {File,
- RightSumData,
- RightSumData,
- Left,
- RightRight}),
- true = ets:delete(FileSummary, Right),
- State1;
- true -> State
- end;
- _ -> State
- end
- end,
- case Left of
- undefined ->
- GoRight();
- _ -> [LeftObj =
- {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] =
- ets:lookup(FileSummary, Left),
- LeftSumData = ValidData + LeftValidData,
- if FileSizeLimit >= LeftSumData ->
- %% here, File will be the source and so will be deleted,
- %% Left will be the destination
- State1 = combine_files(FileObj, LeftObj, State),
- %% this could fail if Right is undefined
- %% left is the 4th field
- ets:update_element(FileSummary, Right, {4, Left}),
- true = ets:insert(FileSummary, {Left, LeftSumData,
- LeftSumData,
- LeftLeft, Right}),
- true = ets:delete(FileSummary, File),
- State1;
- true ->
- GoRight()
- end
- end
+ [] -> State;
+ [FileObj = {File, ValidData, _ContiguousTop, Left, Right}] ->
+ GoRight =
+ fun() ->
+ case Right of
+ undefined -> State;
+ _ when not(CurName =:= Right) ->
+ [RightObj = {Right, RightValidData,
+ _RightContiguousTop, File, RightRight}] =
+ ets:lookup(FileSummary, Right),
+ RightSumData = ValidData + RightValidData,
+ if FileSizeLimit >= RightSumData ->
+ %% here, Right will be the source and so will be deleted,
+ %% File will be the destination
+ State1 = combine_files(RightObj, FileObj,
+ State),
+ %% this could fail if RightRight is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary,
+ RightRight, {4, File}),
+ true = ets:insert(FileSummary, {File,
+ RightSumData,
+ RightSumData,
+ Left,
+ RightRight}),
+ true = ets:delete(FileSummary, Right),
+ State1;
+ true -> State
+ end;
+ _ -> State
+ end
+ end,
+ case Left of
+ undefined ->
+ GoRight();
+ _ -> [LeftObj =
+ {Left, LeftValidData, _LeftContiguousTop, LeftLeft, File}] =
+ ets:lookup(FileSummary, Left),
+ LeftSumData = ValidData + LeftValidData,
+ if FileSizeLimit >= LeftSumData ->
+ %% here, File will be the source and so will be deleted,
+ %% Left will be the destination
+ State1 = combine_files(FileObj, LeftObj, State),
+ %% this could fail if Right is undefined
+ %% left is the 4th field
+ ets:update_element(FileSummary, Right, {4, Left}),
+ true = ets:insert(FileSummary, {Left, LeftSumData,
+ LeftSumData,
+ LeftLeft, Right}),
+ true = ets:delete(FileSummary, File),
+ State1;
+ true ->
+ GoRight()
+ end
+ end
end.
sort_msg_locations_by_offset(Asc, List) ->
Comp = if Asc -> fun erlang:'<'/2;
- true -> fun erlang:'>'/2
- end,
+ true -> fun erlang:'>'/2
+ end,
lists:sort(fun ({_, _, _, OffA, _}, {_, _, _, OffB, _}) ->
- Comp(OffA, OffB)
- end, List).
+ Comp(OffA, OffB)
+ end, List).
truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
{ok, Lowpoint} = file:position(FileHdl, {bof, Lowpoint}),
@@ -951,133 +951,133 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
ok.
combine_files({Source, SourceValid, _SourceContiguousTop,
- _SourceLeft, _SourceRight},
- {Destination, DestinationValid, DestinationContiguousTop,
- _DestinationLeft, _DestinationRight},
- State1) ->
+ _SourceLeft, _SourceRight},
+ {Destination, DestinationValid, DestinationContiguousTop,
+ _DestinationLeft, _DestinationRight},
+ State1) ->
State = close_file(Source, close_file(Destination, State1)),
{ok, SourceHdl} =
- file:open(form_filename(Source),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ file:open(form_filename(Source),
+ [read, write, raw, binary, delayed_write, read_ahead]),
{ok, DestinationHdl} =
- file:open(form_filename(Destination),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ file:open(form_filename(Destination),
+ [read, write, raw, binary, delayed_write, read_ahead]),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file
%% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file
%% then truncate, copy back in, and then copy over from Source
%% otherwise we just truncate straight away and copy over from Source
if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationValid, ExpectedSize);
true ->
- Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} =
- file:open(form_filename(Tmp),
- [read, write, raw, binary, delayed_write, read_ahead]),
- Worklist =
- lists:dropwhile(
- fun ({_, _, _, Offset, _})
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset == DestinationContiguousTop
- %% because if it was then DestinationContiguousTop would have been
- %% extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect that the list should be
- %% naturally sorted as we require, however, we need to enforce it anyway
- end, sort_msg_locations_by_offset(true,
- dets_ets_match_object(State,
- {'_', '_',
- Destination,
- '_', '_'}))),
- TmpSize = DestinationValid - DestinationContiguousTop,
- {TmpSize, BlockStart1, BlockEnd1} =
- lists:foldl(
- fun ({MsgId, RefCount, _Destination, Offset, TotalSize},
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the TmpFile.
- %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% this message is going to end up back in
- %% Destination, at DestinationContiguousTop
- %% + CurOffset
- FinalOffset = DestinationContiguousTop + CurOffset,
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- FinalOffset, TotalSize}),
- NextOffset = CurOffset + Size,
- if BlockStart =:= undefined ->
- %% base case, called only for the
- %% first list elem
- {NextOffset, Offset, Offset + Size};
- Offset =:= BlockEnd ->
- %% extend the current block because
- %% the next msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
- true ->
- %% found a gap, so actually do the
- %% work for the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file:position(DestinationHdl,
- {bof, BlockStart}),
- {ok, BSize} = file:copy(DestinationHdl,
- TmpHdl, BSize),
- {NextOffset, Offset, Offset + Size}
- end
- end, {0, undefined, undefined}, Worklist),
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
- {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocationDets has been updated to
- %% reflect compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file:position(TmpHdl, {bof, 0}),
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be
- %% DestinationValid
- ok = file:sync(DestinationHdl),
- ok = file:close(TmpHdl),
- ok = file:delete(form_filename(Tmp))
+ Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} =
+ file:open(form_filename(Tmp),
+ [read, write, raw, binary, delayed_write, read_ahead]),
+ Worklist =
+ lists:dropwhile(
+ fun ({_, _, _, Offset, _})
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset == DestinationContiguousTop
+ %% because if it was then DestinationContiguousTop would have been
+ %% extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect that the list should be
+ %% naturally sorted as we require, however, we need to enforce it anyway
+ end, sort_msg_locations_by_offset(true,
+ dets_ets_match_object(State,
+ {'_', '_',
+ Destination,
+ '_', '_'}))),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ {TmpSize, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun ({MsgId, RefCount, _Destination, Offset, TotalSize},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the TmpFile.
+ %% Offset, BlockStart and BlockEnd are in the DestinationFile (which is currently the source!)
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% this message is going to end up back in
+ %% Destination, at DestinationContiguousTop
+ %% + CurOffset
+ FinalOffset = DestinationContiguousTop + CurOffset,
+ ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
+ FinalOffset, TotalSize}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the
+ %% first list elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because
+ %% the next msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the
+ %% work for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(DestinationHdl,
+ {bof, BlockStart}),
+ {ok, BSize} = file:copy(DestinationHdl,
+ TmpHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {0, undefined, undefined}, Worklist),
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} = file:position(DestinationHdl, {bof, BlockStart1}),
+ {ok, BSize1} = file:copy(DestinationHdl, TmpHdl, BSize1),
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and MsgLocationDets has been updated to
+ %% reflect compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file:position(TmpHdl, {bof, 0}),
+ ok = truncate_and_extend_file(DestinationHdl,
+ DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} = file:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be
+ %% DestinationValid
+ ok = file:sync(DestinationHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(form_filename(Tmp))
end,
SourceWorkList =
- sort_msg_locations_by_offset(true,
- dets_ets_match_object(State,
- {'_', '_', Source,
- '_', '_'})),
+ sort_msg_locations_by_offset(true,
+ dets_ets_match_object(State,
+ {'_', '_', Source,
+ '_', '_'})),
{ExpectedSize, BlockStart2, BlockEnd2} =
- lists:foldl(
- fun ({MsgId, RefCount, _Source, Offset, TotalSize},
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
- %% update MsgLocationDets to reflect change of file and offset
- ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
- CurOffset, TotalSize}),
- NextOffset = CurOffset + Size,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list
- %% elem
- {NextOffset, Offset, Offset + Size};
- Offset =:= BlockEnd ->
- %% extend the current block because the next
- %% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + Size};
- true ->
- %% found a gap, so actually do the work for
- %% the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file:position(SourceHdl, {bof, BlockStart}),
- {ok, BSize} =
- file:copy(SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + Size}
- end
- end, {DestinationValid, undefined, undefined}, SourceWorkList),
+ lists:foldl(
+ fun ({MsgId, RefCount, _Source, Offset, TotalSize},
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ Size = TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ %% update MsgLocationDets to reflect change of file and offset
+ ok = dets_ets_insert(State, {MsgId, RefCount, Destination,
+ CurOffset, TotalSize}),
+ NextOffset = CurOffset + Size,
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list
+ %% elem
+ {NextOffset, Offset, Offset + Size};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the next
+ %% msg follows straight on
+ {NextOffset, BlockStart, BlockEnd + Size};
+ true ->
+ %% found a gap, so actually do the work for
+ %% the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file:position(SourceHdl, {bof, BlockStart}),
+ {ok, BSize} =
+ file:copy(SourceHdl, DestinationHdl, BSize),
+ {NextOffset, Offset, Offset + Size}
+ end
+ end, {DestinationValid, undefined, undefined}, SourceWorkList),
%% do the last remaining block
BSize2 = BlockEnd2 - BlockStart2,
{ok, BlockStart2} = file:position(SourceHdl, {bof, BlockStart2}),
@@ -1090,38 +1090,38 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
State.
close_file(File, State = #dqstate { read_file_handles =
- {ReadHdls, ReadHdlsAge} }) ->
+ {ReadHdls, ReadHdlsAge} }) ->
case dict:find(File, ReadHdls) of
- error ->
- State;
- {ok, {Hdl, Then}} ->
- ok = file:close(Hdl),
- State #dqstate { read_file_handles =
- { dict:erase(File, ReadHdls),
- gb_trees:delete(Then, ReadHdlsAge) } }
+ error ->
+ State;
+ {ok, {Hdl, Then}} ->
+ ok = file:close(Hdl),
+ State #dqstate { read_file_handles =
+ { dict:erase(File, ReadHdls),
+ gb_trees:delete(Then, ReadHdlsAge) } }
end.
delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
[{File, ValidData, _ContiguousTop, Left, Right}] =
- ets:lookup(FileSummary, File),
+ ets:lookup(FileSummary, File),
case ValidData of
- %% we should NEVER find the current file in here hence right
+ %% we should NEVER find the current file in here hence right
%% should always be a file, not undefined
- 0 -> case {Left, Right} of
- {undefined, _} when not(is_atom(Right)) ->
- %% the eldest file is empty. YAY!
- %% left is the 4th field
- true = ets:update_element(FileSummary, Right, {4, undefined});
- {_, _} when not(is_atom(Right)) ->
- %% left is the 4th field
- true = ets:update_element(FileSummary, Right, {4, Left}),
- %% right is the 5th field
- true = ets:update_element(FileSummary, Left, {5, Right})
- end,
- true = ets:delete(FileSummary, File),
- ok = file:delete(form_filename(File)),
- Acc;
- _ -> [File|Acc]
+ 0 -> case {Left, Right} of
+ {undefined, _} when not(is_atom(Right)) ->
+ %% the eldest file is empty. YAY!
+ %% left is the 4th field
+ true = ets:update_element(FileSummary, Right, {4, undefined});
+ {_, _} when not(is_atom(Right)) ->
+ %% left is the 4th field
+ true = ets:update_element(FileSummary, Right, {4, Left}),
+ %% right is the 5th field
+ true = ets:update_element(FileSummary, Left, {5, Right})
+ end,
+ true = ets:delete(FileSummary, File),
+ ok = file:delete(form_filename(File)),
+ Acc;
+ _ -> [File|Acc]
end.
%% ---- DISK RECOVERY ----
@@ -1130,10 +1130,10 @@ load_from_disk(State) ->
%% sorted so that smallest number is first. which also means
%% eldest file (left-most) first
ok = case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
- E -> E
- end,
+ {atomic, ok} -> ok;
+ {aborted,{already_exists,rabbit_disk_queue,_}} -> ok;
+ E -> E
+ end,
{Files, TmpFiles} = get_disk_queue_files(),
ok = recover_crashed_compactions(Files, TmpFiles),
%% There should be no more tmp files now, so go ahead and load the
@@ -1142,44 +1142,44 @@ load_from_disk(State) ->
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
{atomic, true} = mnesia:transaction(
- fun() ->
- ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
- true = 1 =:=
- length(dets_ets_lookup(State1, MsgId))
- end,
- true, rabbit_disk_queue)
- end),
+ fun() ->
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
+ true = 1 =:=
+ length(dets_ets_lookup(State1, MsgId))
+ end,
+ true, rabbit_disk_queue)
+ end),
State2 = extract_sequence_numbers(State1),
ok = case mnesia:del_table_index(rabbit_disk_queue, msg_id) of
- {atomic, ok} -> ok;
- %% hmm, something weird must be going on, but it's
- %% probably not the end of the world
- {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
- E2 -> E2
- end,
+ {atomic, ok} -> ok;
+ %% hmm, something weird must be going on, but it's
+ %% probably not the end of the world
+ {aborted,{no_exists,rabbit_disk_queue,_}} -> ok;
+ E2 -> E2
+ end,
{ok, State2}.
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
{atomic, true} = mnesia:transaction(
fun() ->
- ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
- NextWrite = SeqId + 1,
- case ets:lookup(Sequences, Q) of
- [] ->
- true = ets:insert_new(Sequences,
- {Q, SeqId, NextWrite});
- [Orig = {Q, Read, Write}] ->
- Repl = {Q, lists:min([Read, SeqId]),
- lists:max([Write, NextWrite])},
- if Orig /= Repl ->
- true = ets:insert(Sequences, Repl);
- true -> true
- end
- end
- end, true, rabbit_disk_queue)
+ ok = mnesia:read_lock_table(rabbit_disk_queue),
+ mnesia:foldl(
+ fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
+ NextWrite = SeqId + 1,
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ true = ets:insert_new(Sequences,
+ {Q, SeqId, NextWrite});
+ [Orig = {Q, Read, Write}] ->
+ Repl = {Q, lists:min([Read, SeqId]),
+ lists:max([Write, NextWrite])},
+ if Orig /= Repl ->
+ true = ets:insert(Sequences, Repl);
+ true -> true
+ end
+ end
+ end, true, rabbit_disk_queue)
end),
remove_gaps_in_sequences(State),
State.
@@ -1195,79 +1195,79 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) ->
%% likelihood of gaps being at the bottom rather than the top of
%% the queue, so shuffling up should be the better bet.
{atomic, _} =
- mnesia:transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foreach(
- fun ({Q, ReadSeqId, WriteSeqId}) ->
- Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0),
- true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId})
- end, ets:match_object(Sequences, '_'))
- end).
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ lists:foreach(
+ fun ({Q, ReadSeqId, WriteSeqId}) ->
+ Gap = shuffle_up(Q, ReadSeqId - 1, WriteSeqId - 1, 0),
+ true = ets:insert(Sequences, {Q, ReadSeqId + Gap, WriteSeqId})
+ end, ets:match_object(Sequences, '_'))
+ end).
shuffle_up(_Q, SeqId, SeqId, Gap) ->
Gap;
shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
GapInc =
- case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
- [] -> 1;
- [Obj] ->
- if Gap =:= 0 -> ok;
- true -> mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap },
- next_seq_id = SeqId + Gap + 1
- },
- write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
- end,
- 0
- end,
+ case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
+ [] -> 1;
+ [Obj] ->
+ if Gap =:= 0 -> ok;
+ true -> mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap },
+ next_seq_id = SeqId + Gap + 1
+ },
+ write),
+ mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
+ end,
+ 0
+ end,
shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
load_messages(undefined, [], State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName }) ->
+ current_file_name = CurName }) ->
true = ets:insert_new(FileSummary, {CurName, 0, 0, undefined, undefined}),
State;
load_messages(Left, [], State) ->
Num = list_to_integer(filename:rootname(Left)),
Offset = case dets_ets_match_object(State, {'_', '_', Left, '_', '_'}) of
- [] -> 0;
- L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
- sort_msg_locations_by_offset(false, L),
- MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
- end,
+ [] -> 0;
+ L -> [{_MsgId, _RefCount, Left, MaxOffset, TotalSize}|_] =
+ sort_msg_locations_by_offset(false, L),
+ MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ end,
State #dqstate { current_file_num = Num, current_file_name = Left,
- current_offset = Offset };
+ current_offset = Offset };
load_messages(Left, [File|Files],
- State = #dqstate { file_summary = FileSummary }) ->
+ State = #dqstate { file_summary = FileSummary }) ->
%% [{MsgId, TotalSize, FileOffset}]
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
- fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- },
- msg_id)) of
- 0 -> {VMAcc, VTSAcc};
- RefCount ->
- true = dets_ets_insert_new(State, {MsgId, RefCount, File,
- Offset, TotalSize}),
- {[{MsgId, TotalSize, Offset}|VMAcc],
- VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
- }
- end
- end, {[], 0}, Messages),
+ fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id)) of
+ 0 -> {VMAcc, VTSAcc};
+ RefCount ->
+ true = dets_ets_insert_new(State, {MsgId, RefCount, File,
+ Offset, TotalSize}),
+ {[{MsgId, TotalSize, Offset}|VMAcc],
+ VTSAcc + TotalSize + ?FILE_PACKING_ADJUSTMENT
+ }
+ end
+ end, {[], 0}, Messages),
%% foldl reverses lists and find_contiguous_block_prefix needs
%% elems in the same order as from scan_file_for_valid_messages
{ContiguousTop, _} = find_contiguous_block_prefix(lists:reverse(ValidMessagesRev)),
Right = case Files of
- [] -> undefined;
- [F|_] -> F
- end,
+ [] -> undefined;
+ [F|_] -> F
+ end,
true = ets:insert_new(FileSummary, {File, ValidTotalSize, ContiguousTop, Left, Right}),
load_messages(File, Files, State).
@@ -1275,7 +1275,7 @@ load_messages(Left, [File|Files],
recover_crashed_compactions(Files, TmpFiles) ->
lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end,
- TmpFiles),
+ TmpFiles),
ok.
recover_crashed_compactions1(Files, TmpFile) ->
@@ -1284,22 +1284,22 @@ recover_crashed_compactions1(Files, TmpFile) ->
true = lists:member(NonTmpRelatedFile, Files),
%% [{MsgId, TotalSize, FileOffset}]
{ok, UncorruptedMessagesTmp} =
- scan_file_for_valid_messages(form_filename(TmpFile)),
+ scan_file_for_valid_messages(form_filename(TmpFile)),
MsgIdsTmp = lists:map(GrabMsgId, UncorruptedMessagesTmp),
%% all of these messages should appear in the mnesia table,
%% otherwise they wouldn't have been copied out
lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- },
- msg_id))
- end, MsgIdsTmp),
+ true = 0 < length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id))
+ end, MsgIdsTmp),
{ok, UncorruptedMessages} =
- scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
MsgIds = lists:map(GrabMsgId, UncorruptedMessages),
%% 1) It's possible that everything in the tmp file is also in the main file
%% such that the main file is (prefix ++ tmpfile). This means that compaction
@@ -1321,62 +1321,62 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% Plan: Truncate the main file back to before any of the files in the tmp file and copy
%% them over again
case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpFile);
- _False ->
- %% we're in case 4 above.
- %% check that everything in the main file is a valid message in mnesia
- lists:foreach(fun (MsgId) ->
- true = 0 < length(mnesia:dirty_index_match_object
- (rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = '_',
- is_delivered = '_',
- next_seq_id = '_'
- },
- msg_id))
- end, MsgIds),
- %% The main file should be contiguous
- {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end,
- MsgIds),
-
- {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
- [write, raw, binary, delayed_write]),
- {ok, Top} = file:position(MainHdl, Top),
- ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file
- %% there really could be rubbish at the end of the file -
- %% we could have failed after the extending truncate.
- %% Remember the head of the list will be the highest entry
- %% in the file
- [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
- ExpectedAbsPos = Top + TmpSize,
- {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
- %% and now extend the main file as big as necessary in a
- %% single move if we run out of disk space, this truncate
- %% could fail, but we still aren't risking losing data
- ok = file:truncate(MainHdl),
- {ok, TmpHdl} = file:open(form_filename(TmpFile),
- [read, raw, binary, read_ahead]),
- {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
- ok = file:close(MainHdl),
- ok = file:close(TmpHdl),
- ok = file:delete(TmpFile),
-
- {ok, MainMessages} =
- scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
- MsgIdsMain = lists:map(GrabMsgId, MainMessages),
- %% check that everything in MsgIds is in MsgIdsMain
- true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
- MsgIds),
- %% check that everything in MsgIdsTmp is in MsgIdsMain
- true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
- MsgIdsTmp)
+ true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
+ %% note this also catches the case when the tmp file
+ %% is empty
+ ok = file:delete(TmpFile);
+ _False ->
+ %% we're in case 4 above.
+ %% check that everything in the main file is a valid message in mnesia
+ lists:foreach(fun (MsgId) ->
+ true = 0 < length(mnesia:dirty_index_match_object
+ (rabbit_disk_queue,
+ #dq_msg_loc { msg_id = MsgId,
+ queue_and_seq_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
+ msg_id))
+ end, MsgIds),
+ %% The main file should be contiguous
+ {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),
+ %% we should have that none of the messages in the prefix
+ %% are in the tmp file
+ true = lists:all(fun (MsgId) -> not(lists:member(MsgId, MsgIdsTmp)) end,
+ MsgIds),
+
+ {ok, MainHdl} = file:open(form_filename(NonTmpRelatedFile),
+ [write, raw, binary, delayed_write]),
+ {ok, Top} = file:position(MainHdl, Top),
+ ok = file:truncate(MainHdl), %% wipe out any rubbish at the end of the file
+ %% there really could be rubbish at the end of the file -
+ %% we could have failed after the extending truncate.
+ %% Remember the head of the list will be the highest entry
+ %% in the file
+ [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
+ TmpSize = TmpTopOffset + TmpTopTotalSize + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Top + TmpSize,
+ {ok, ExpectedAbsPos} = file:position(MainHdl, {cur, TmpSize}),
+ %% and now extend the main file as big as necessary in a
+ %% single move if we run out of disk space, this truncate
+ %% could fail, but we still aren't risking losing data
+ ok = file:truncate(MainHdl),
+ {ok, TmpHdl} = file:open(form_filename(TmpFile),
+ [read, raw, binary, read_ahead]),
+ {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
+ ok = file:close(MainHdl),
+ ok = file:close(TmpHdl),
+ ok = file:delete(TmpFile),
+
+ {ok, MainMessages} =
+ scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
+ MsgIdsMain = lists:map(GrabMsgId, MainMessages),
+ %% check that everything in MsgIds is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIds),
+ %% check that everything in MsgIdsTmp is in MsgIdsMain
+ true = lists:all(fun (MsgId) -> lists:member(MsgId, MsgIdsMain) end,
+ MsgIdsTmp)
end,
ok.
@@ -1386,16 +1386,16 @@ recover_crashed_compactions1(Files, TmpFile) ->
find_contiguous_block_prefix([]) -> {0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail]) ->
case find_contiguous_block_prefix(Tail, Offset, [MsgId]) of
- {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
- lists:reverse(Acc)};
- Res -> Res
+ {ok, Acc} -> {Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
+ lists:reverse(Acc)};
+ Res -> Res
end.
find_contiguous_block_prefix([], 0, Acc) ->
{ok, Acc};
find_contiguous_block_prefix([], _N, _Acc) ->
{0, []};
find_contiguous_block_prefix([{MsgId, TotalSize, Offset}|Tail],
- ExpectedOffset, Acc)
+ ExpectedOffset, Acc)
when ExpectedOffset =:= Offset + TotalSize + ?FILE_PACKING_ADJUSTMENT ->
find_contiguous_block_prefix(Tail, Offset, [MsgId|Acc]);
find_contiguous_block_prefix(List, _ExpectedOffset, _Acc) ->
@@ -1421,29 +1421,29 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
MsgIdBinSize = size(MsgIdBin),
TotalSize = BodySize + MsgIdBinSize,
case file:write(FileHdl, <<TotalSize:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
- MsgIdBin:MsgIdBinSize/binary,
- MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
- ok -> {ok, TotalSize};
- KO -> KO
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdBin:MsgIdBinSize/binary,
+ MsgBody:BodySize/binary,
+ ?WRITE_OK:?WRITE_OK_SIZE_BITS>>) of
+ ok -> {ok, TotalSize};
+ KO -> KO
end.
read_message_at_offset(FileHdl, Offset, TotalSize) ->
TotalSizeWriteOkBytes = TotalSize + 1,
case file:position(FileHdl, {bof, Offset}) of
- {ok, Offset} ->
- case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
- Rest:TotalSizeWriteOkBytes/binary>>} ->
- BodySize = TotalSize - MsgIdBinSize,
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
- {ok, {MsgBody, BodySize}};
- KO -> KO
- end;
- KO -> KO
+ {ok, Offset} ->
+ case file:read(FileHdl, TotalSize + ?FILE_PACKING_ADJUSTMENT) of
+ {ok, <<TotalSize:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ Rest:TotalSizeWriteOkBytes/binary>>} ->
+ BodySize = TotalSize - MsgIdBinSize,
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ ?WRITE_OK:?WRITE_OK_SIZE_BITS>> = Rest,
+ {ok, {MsgBody, BodySize}};
+ KO -> KO
+ end;
+ KO -> KO
end.
scan_file_for_valid_messages(File) ->
@@ -1454,53 +1454,53 @@ scan_file_for_valid_messages(File) ->
scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
case read_next_file_entry(FileHdl, Offset) of
- {ok, eof} -> {ok, Acc};
- {ok, {corrupted, NextOffset}} ->
- scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {ok, MsgId, TotalSize, NextOffset}} ->
- scan_file_for_valid_messages(FileHdl, NextOffset,
- [{MsgId, TotalSize, Offset}|Acc]);
- _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages
+ {ok, eof} -> {ok, Acc};
+ {ok, {corrupted, NextOffset}} ->
+ scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
+ {ok, {ok, MsgId, TotalSize, NextOffset}} ->
+ scan_file_for_valid_messages(FileHdl, NextOffset,
+ [{MsgId, TotalSize, Offset}|Acc]);
+ _KO -> {ok, Acc} %% bad message, but we may still have recovered some valid messages
end.
-
+
read_next_file_entry(FileHdl, Offset) ->
TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
case file:read(FileHdl, TwoIntegers) of
- {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
- {true, _} -> {ok, eof}; %% Nothing we can do other than stop
- {false, true} -> %% current message corrupted, try skipping past it
- ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
- case file:position(FileHdl, {cur, TotalSize + 1}) of
- {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}};
- {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
- KO -> KO
- end;
- {false, false} -> %% all good, let's continue
- case file:read(FileHdl, MsgIdBinSize) of
- {ok, <<MsgId:MsgIdBinSize/binary>>} ->
- ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
- case file:position(FileHdl,
- {cur, TotalSize - MsgIdBinSize}) of
- {ok, ExpectedAbsPos} ->
- NextOffset = Offset + TotalSize +
- ?FILE_PACKING_ADJUSTMENT,
- case file:read(FileHdl, 1) of
- {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {ok, binary_to_term(MsgId),
- TotalSize, NextOffset}};
- {ok, _SomeOtherData} ->
- {ok, {corrupted, NextOffset}};
- KO -> KO
- end;
- {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
- KO -> KO
- end;
- eof -> {ok, eof};
- KO -> KO
- end
- end;
- eof -> {ok, eof};
- KO -> KO
+ {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
+ {true, _} -> {ok, eof}; %% Nothing we can do other than stop
+ {false, true} -> %% current message corrupted, try skipping past it
+ ExpectedAbsPos = Offset + ?FILE_PACKING_ADJUSTMENT + TotalSize,
+ case file:position(FileHdl, {cur, TotalSize + 1}) of
+ {ok, ExpectedAbsPos} -> {ok, {corrupted, ExpectedAbsPos}};
+ {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
+ KO -> KO
+ end;
+ {false, false} -> %% all good, let's continue
+ case file:read(FileHdl, MsgIdBinSize) of
+ {ok, <<MsgId:MsgIdBinSize/binary>>} ->
+ ExpectedAbsPos = Offset + TwoIntegers + TotalSize,
+ case file:position(FileHdl,
+ {cur, TotalSize - MsgIdBinSize}) of
+ {ok, ExpectedAbsPos} ->
+ NextOffset = Offset + TotalSize +
+ ?FILE_PACKING_ADJUSTMENT,
+ case file:read(FileHdl, 1) of
+ {ok, <<?WRITE_OK:?WRITE_OK_SIZE_BITS>>} ->
+ {ok, {ok, binary_to_term(MsgId),
+ TotalSize, NextOffset}};
+ {ok, _SomeOtherData} ->
+ {ok, {corrupted, NextOffset}};
+ KO -> KO
+ end;
+ {ok, _SomeOtherPos} -> {ok, eof}; %% seek failed, so give up
+ KO -> KO
+ end;
+ eof -> {ok, eof};
+ KO -> KO
+ end
+ end;
+ eof -> {ok, eof};
+ KO -> KO
end.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index c7c76eb230..4749e1dac4 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -31,57 +31,66 @@
-module(rabbit_mixed_queue).
+-export([start_link/2]).
+
-export([publish/4, deliver/1, ack/2,
- tx_publish/4, tx_commit/3, tx_cancel/2,
- requeue/2, purge/1]).
+ tx_publish/4, tx_commit/3, tx_cancel/2,
+ requeue/2, purge/1]).
-record(mqstate, { mode,
- msg_buf,
- next_write_seq,
- queue
- }
+ msg_buf,
+ next_write_seq,
+ queue
+ }
).
+-define(FILE_SIZE_LIMIT, (100*1024*1024)).
+
+start_link(Queue, Mode) when Mode =:= disk orelse Mode =:= mixed ->
+ rabbit_disk_queue:start_link(?FILE_SIZE_LIMIT),
+ rabbit_disk_queue:to_ram_disk_mode(), %% TODO, CHANGE ME
+ {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 0, queue = Queue }}.
+
publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk, queue = Q }) ->
ok = rabbit_disk_queue:publish(Q, MsgId, Msg),
{ok, State};
publish(MsgId, Msg, IsPersistent,
- State = #mqstate { queue = Q, mode = mixed,
- next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
+ State = #mqstate { queue = Q, mode = mixed,
+ next_write_seq = NextSeq, msg_buf = MsgBuf }) ->
if IsPersistent ->
- ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg);
+ ok = rabbit_disk_queue:publish_with_seq(Q, MsgId, NextSeq, Msg);
true -> ok
end,
{ok, State #mqstate { next_write_seq = NextSeq + 1,
- msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}},
- MsgBuf)
- }}.
+ msg_buf = queue:in({NextSeq, {MsgId, Msg, IsPersistent}},
+ MsgBuf)
+ }}.
deliver(State = #mqstate { mode = disk, queue = Q }) ->
{rabbit_disk_queue:deliver(Q), State};
deliver(State = #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf }) ->
{Result, MsgBuf2} = queue:out(MsgBuf),
case Result of
- empty ->
- {empty, State};
- {value, {_Seq, {MsgId, Msg, IsPersistent}}} ->
- {IsDelivered, Ack} =
- if IsPersistent ->
- {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q),
- {IsDelivered2, Ack2};
- true -> {false, noack}
- end,
- {{MsgId, Msg, size(Msg), IsDelivered, Ack},
- State #mqstate { msg_buf = MsgBuf2 }}
+ empty ->
+ {empty, State};
+ {value, {_Seq, {MsgId, Msg, IsPersistent}}} ->
+ {IsDelivered, Ack} =
+ if IsPersistent ->
+ {MsgId, IsDelivered2, Ack2} = rabbit_disk_queue:phantom_deliver(Q),
+ {IsDelivered2, Ack2};
+ true -> {false, noack}
+ end,
+ {{MsgId, Msg, size(Msg), IsDelivered, Ack},
+ State #mqstate { msg_buf = MsgBuf2 }}
end.
remove_noacks(Acks) ->
lists:filter(fun (A) -> A /= noack end, Acks).
-ack(Acks, State = #mqstate { queue = Q }) ->
+ack(Acks, State = #mqstate { queue = Q }) ->
ok = rabbit_disk_queue:ack(Q, remove_noacks(Acks)),
{ok, State}.
-
+
tx_publish(MsgId, Msg, _IsPersistent, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_publish(MsgId, Msg),
{ok, State};
@@ -98,32 +107,32 @@ tx_commit(Publishes, Acks, State = #mqstate { mode = disk, queue = Q }) ->
ok = rabbit_disk_queue:tx_commit(Q, only_msg_ids(Publishes), Acks),
{ok, State};
tx_commit(Publishes, Acks, State = #mqstate { mode = mixed, queue = Q,
- msg_buf = MsgBuf,
- next_write_seq = NextSeq
- }) ->
+ msg_buf = MsgBuf,
+ next_write_seq = NextSeq
+ }) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) ->
- Acc2 =
- if IsPersistent ->
- [{MsgId, NextSeq3} | Acc];
- true -> Acc
- end,
- MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
- MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, Publishes),
+ lists:foldl(fun ({MsgId, Msg, IsPersistent}, {Acc, MsgBuf3, NextSeq3}) ->
+ Acc2 =
+ if IsPersistent ->
+ [{MsgId, NextSeq3} | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
+ MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, Publishes),
%% foldl reverses, so re-reverse PersistentPubs to match
%% requirements of rabbit_disk_queue (ascending SeqIds)
ok = rabbit_disk_queue:tx_commit_with_seqs(Q, lists:reverse(PersistentPubs),
- remove_noacks(Acks)),
+ remove_noacks(Acks)),
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
only_persistent_msg_ids(Pubs) ->
lists:reverse(lists:foldl(fun ({MsgId, _, IsPersistent}, Acc) ->
- if IsPersistent -> [MsgId | Acc];
- true -> Acc
- end
- end, [], Pubs)).
+ if IsPersistent -> [MsgId | Acc];
+ true -> Acc
+ end
+ end, [], Pubs)).
tx_cancel(Publishes, State = #mqstate { mode = disk }) ->
ok = rabbit_disk_queue:tx_cancel(only_msg_ids(Publishes)),
@@ -139,21 +148,21 @@ requeue(MessagesWithAckTags, State = #mqstate { mode = disk, queue = Q }) ->
rabbit_disk_queue:requeue(Q, only_ack_tags(MessagesWithAckTags)),
{ok, State};
requeue(MessagesWithAckTags, State = #mqstate { mode = mixed, queue = Q,
- msg_buf = MsgBuf,
- next_write_seq = NextSeq
- }) ->
+ msg_buf = MsgBuf,
+ next_write_seq = NextSeq
+ }) ->
{PersistentPubs, MsgBuf2, NextSeq2} =
- lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) ->
- Acc2 =
- if IsPersistent ->
- {MsgId, _OldSeqId} = AckTag,
- [{AckTag, NextSeq3} | Acc];
- true -> Acc
- end,
- MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
- MsgBuf3),
- {Acc2, MsgBuf4, NextSeq3 + 1}
- end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
+ lists:foldl(fun ({{MsgId, Msg, IsPersistent}, AckTag}, {Acc, MsgBuf3, NextSeq3}) ->
+ Acc2 =
+ if IsPersistent ->
+ {MsgId, _OldSeqId} = AckTag,
+ [{AckTag, NextSeq3} | Acc];
+ true -> Acc
+ end,
+ MsgBuf4 = queue:in({NextSeq3, {MsgId, Msg, IsPersistent}},
+ MsgBuf3),
+ {Acc2, MsgBuf4, NextSeq3 + 1}
+ end, {[], MsgBuf, NextSeq}, MessagesWithAckTags),
ok = rabbit_disk_queue:requeue_with_seqs(Q, lists:reverse(PersistentPubs)),
{ok, State #mqstate { msg_buf = MsgBuf2, next_write_seq = NextSeq2 }}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 14461abb13..552e4ed959 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -686,10 +686,10 @@ test_disk_queue() ->
% unicode chars are supported properly from r13 onwards
io:format("Msg Count\t| Msg Size\t| Queue Count\t| Startup mu s\t| Publish mu s\t| Pub mu s/msg\t| Pub mu s/byte\t| Deliver mu s\t| Del mu s/msg\t| Del mu s/byte~n", []),
[begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize),
- timer:sleep(1000) end || % 1000 milliseconds
- MsgSize <- [512, 8192, 32768, 131072],
- Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)],
- MsgCount <- [1024, 4096, 16384]
+ timer:sleep(1000) end || % 1000 milliseconds
+ MsgSize <- [512, 8192, 32768, 131072],
+ Qs <- [[1], lists:seq(1,10)], %, lists:seq(1,100), lists:seq(1,1000)],
+ MsgCount <- [1024, 4096, 16384]
],
rdq_virgin(),
passed = rdq_stress_gc(10000),
@@ -706,27 +706,27 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
{Publish, ok} =
- timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
- || N <- List, _ <- Qs] end,
- fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
- || Q <- Qs] end
- ]]),
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg)
+ || N <- List, _ <- Qs] end,
+ fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List, [])
+ || Q <- Qs] end
+ ]]),
{Deliver, ok} =
- timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin SeqIds =
- [begin {N, Msg, MsgSizeBytes, false, SeqId} =
- rabbit_disk_queue:deliver(Q), SeqId end
- || N <- List],
- ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
- end || Q <- Qs]
- end]]),
+ timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [begin SeqIds =
+ [begin {N, Msg, MsgSizeBytes, false, SeqId} =
+ rabbit_disk_queue:deliver(Q), SeqId end
+ || N <- List],
+ ok = rabbit_disk_queue:tx_commit(Q, [], SeqIds)
+ end || Q <- Qs]
+ end]]),
io:format(" ~15.10B| ~14.10B| ~14.10B| ~14.1f| ~14.1f| ~14.6f| ~14.10f| ~14.1f| ~14.6f| ~14.10f~n",
- [MsgCount, MsgSizeBytes, QCount, float(Startup),
- float(Publish), (Publish / (MsgCount * QCount)),
- (Publish / (MsgCount * QCount * MsgSizeBytes)),
- float(Deliver), (Deliver / (MsgCount * QCount)),
- (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
+ [MsgCount, MsgSizeBytes, QCount, float(Startup),
+ float(Publish), (Publish / (MsgCount * QCount)),
+ (Publish / (MsgCount * QCount * MsgSizeBytes)),
+ float(Deliver), (Deliver / (MsgCount * QCount)),
+ (Deliver / (MsgCount * QCount * MsgSizeBytes))]),
rdq_stop().
% we know each file is going to be 1024*1024*10 bytes in size (10MB), so make sure we have
@@ -741,30 +741,30 @@ rdq_stress_gc(MsgCount) ->
rabbit_disk_queue:tx_commit(q, List, []),
StartChunk = round(MsgCount / 20), % 5%
AckList =
- lists:reverse(
- lists:foldl(
- fun (E, Acc) ->
- case Acc of
- [] -> [E];
- [F|_Fs] ->
- case E rem F of
- 0 -> Acc;
- _ -> [E|Acc]
- end
- end
- end, [], lists:flatten([lists:seq(N,MsgCount,N)
- || N <- lists:seq(StartChunk,MsgCount)]))) ++
- lists:seq(1, (StartChunk - 1)),
+ lists:reverse(
+ lists:foldl(
+ fun (E, Acc) ->
+ case Acc of
+ [] -> [E];
+ [F|_Fs] ->
+ case E rem F of
+ 0 -> Acc;
+ _ -> [E|Acc]
+ end
+ end
+ end, [], lists:flatten([lists:seq(N,MsgCount,N)
+ || N <- lists:seq(StartChunk,MsgCount)]))) ++
+ lists:seq(1, (StartChunk - 1)),
MsgIdToSeqDict =
- lists:foldl(
- fun (_, Acc) ->
- {MsgId, Msg, MsgSizeBytes, false, SeqId} =
- rabbit_disk_queue:deliver(q),
- dict:store(MsgId, SeqId, Acc)
- end, dict:new(), List),
+ lists:foldl(
+ fun (_, Acc) ->
+ {MsgId, Msg, MsgSizeBytes, false, SeqId} =
+ rabbit_disk_queue:deliver(q),
+ dict:store(MsgId, SeqId, Acc)
+ end, dict:new(), List),
%% we really do want to ack each of this individually
[begin {ok, SeqId} = dict:find(MsgId, MsgIdToSeqDict),
- rabbit_disk_queue:ack(q, [SeqId]) end
+ rabbit_disk_queue:ack(q, [SeqId]) end
|| MsgId <- AckList],
rabbit_disk_queue:tx_commit(q, [], []),
rdq_stop(),
@@ -800,15 +800,15 @@ rdq_test_startup_with_queue_gaps() ->
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% ack every other message we have delivered (starting at the _first_)
lists:foldl(fun (SeqId2, true) ->
- rabbit_disk_queue:ack(q, [SeqId2]),
- false;
- (_SeqId2, false) ->
- true
- end, true, Seqs),
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ false;
+ (_SeqId2, false) ->
+ true
+ end, true, Seqs),
rabbit_disk_queue:tx_commit(q, [], []),
io:format("Acked every other message delivered done~n", []),
rdq_stop(),
@@ -816,12 +816,12 @@ rdq_test_startup_with_queue_gaps() ->
io:format("Startup (with shuffle) done~n", []),
%% should have shuffled up. So we should now get lists:seq(2,500,2) already delivered
Seqs2 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(2,Half,2)],
+ || N <- lists:seq(2,Half,2)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
io:format("Reread non-acked messages done~n", []),
%% and now fetch the rest
Seqs3 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1 + Half,Total)],
+ || N <- lists:seq(1 + Half,Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
io:format("Read second half done~n", []),
empty = rabbit_disk_queue:deliver(q),
@@ -840,25 +840,25 @@ rdq_test_redeliver() ->
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
%% now requeue every other message (starting at the _first_)
%% and ack the other ones
lists:foldl(fun (SeqId2, true) ->
- rabbit_disk_queue:requeue(q, [SeqId2]),
- false;
- (SeqId2, false) ->
- rabbit_disk_queue:ack(q, [SeqId2]),
- true
- end, true, Seqs),
+ rabbit_disk_queue:requeue(q, [SeqId2]),
+ false;
+ (SeqId2, false) ->
+ rabbit_disk_queue:ack(q, [SeqId2]),
+ true
+ end, true, Seqs),
rabbit_disk_queue:tx_commit(q, [], []),
io:format("Redeliver and acking done~n", []),
%% we should now get the 2nd half in order, followed by every-other-from-the-first-half
Seqs2 = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1+Half, Total)],
+ || N <- lists:seq(1+Half, Total)],
rabbit_disk_queue:tx_commit(q, [], Seqs2),
Seqs3 = [begin {N, Msg, 256, true, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1, Half, 2)],
+ || N <- lists:seq(1, Half, 2)],
rabbit_disk_queue:tx_commit(q, [], Seqs3),
empty = rabbit_disk_queue:deliver(q),
rdq_stop(),
@@ -876,7 +876,7 @@ rdq_test_purge() ->
io:format("Publish done~n", []),
%% deliver first half
Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
- || N <- lists:seq(1,Half)],
+ || N <- lists:seq(1,Half)],
io:format("Deliver first half done~n", []),
rabbit_disk_queue:purge(q),
io:format("Purge done~n", []),
@@ -891,7 +891,7 @@ rdq_time_commands(Funcs) ->
rdq_virgin() ->
{Micros, {ok, _}} =
- timer:tc(rabbit_disk_queue, start_link, [1024*1024]),
+ timer:tc(rabbit_disk_queue, start_link, [1024*1024]),
ok = rabbit_disk_queue:stop_and_obliterate(),
timer:sleep(1000),
Micros.