summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-23 18:51:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-23 18:51:02 +0100
commit2940ef9035d139458c1f6a184ebd11f8cbc437a5 (patch)
tree9cbc1748a15bef6ddef34f899379e8ea4760fd6d /src
parent2057297e947a5f9995308082a00b7fd79084b03d (diff)
downloadrabbitmq-server-git-2940ef9035d139458c1f6a184ebd11f8cbc437a5.tar.gz
preemptive refactoring and other assorted changes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl114
-rw-r--r--src/rabbit_mnesia.erl5
2 files changed, 54 insertions, 65 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c0ed7e62b1..04c8a82579 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -976,39 +976,11 @@ internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-remove_messages(Q, MsgSeqIds, MnesiaDelete,
- State = #dqstate { file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+remove_messages(Q, MsgSeqIds, MnesiaDelete, State) ->
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files1) ->
- [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
- dets_ets_lookup(State, MsgId),
- Files2 =
- case RefCount of
- 1 ->
- ok = dets_ets_delete(State, MsgId),
- ok = remove_cache_entry(MsgId, State),
- [{File, ValidTotalSize, ContiguousTop,
- Left, Right}] = ets:lookup(FileSummary, File),
- ContiguousTop1 =
- lists:min([ContiguousTop, Offset]),
- true =
- ets:insert(FileSummary,
- {File, (ValidTotalSize-TotalSize-
- ?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1, Left, Right}),
- if CurName =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end;
- _ when 1 < RefCount ->
- ok = decrement_cache(MsgId, State),
- ok = dets_ets_insert(
- State, {MsgId, RefCount - 1, File, Offset,
- TotalSize, IsPersistent}),
- Files1
- end,
+ Files2 = remove_message(MsgId, Files1, State),
ok = case MnesiaDelete of
true -> mnesia:dirty_delete(rabbit_disk_queue,
{Q, SeqId});
@@ -1019,6 +991,34 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete,
State1 = compact(Files, State),
{ok, State1}.
+remove_message(MsgId, Files,
+ State = #dqstate { file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
+ [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] =
+ dets_ets_lookup(State, MsgId),
+ case RefCount of
+ 1 ->
+ ok = dets_ets_delete(State, MsgId),
+ ok = remove_cache_entry(MsgId, State),
+ [{File, ValidTotalSize, ContiguousTop, Left, Right}] =
+ ets:lookup(FileSummary, File),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ true =
+ ets:insert(FileSummary,
+ {File,
+ (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT),
+ ContiguousTop1, Left, Right}),
+ if CurName =:= File -> Files;
+ true -> sets:add_element(File, Files)
+ end;
+ _ when 1 < RefCount ->
+ ok = decrement_cache(MsgId, State),
+ ok = dets_ets_insert(State, {MsgId, RefCount - 1, File, Offset,
+ TotalSize, IsPersistent}),
+ Files
+ end.
+
internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
guid = MsgId },
State = #dqstate { current_file_handle = CurHdl,
@@ -1541,48 +1541,39 @@ load_from_disk(State) ->
%% Finally, check there is nothing in mnesia which we haven't
%% loaded
Key = mnesia:dirty_first(rabbit_disk_queue),
- {ok, State2} = prune_mnesia(State1, Key, [], [], 0),
+ {ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0),
+ State2 = compact(AlteredFiles, State1),
State3 = extract_sequence_numbers(State2),
ok = del_index(),
{ok, State3}.
-prune_mnesia_flush_batch(State, DeleteAcc, RemoveAcc) ->
- ok = lists:foldl(fun (Key, ok) ->
- mnesia:dirty_delete(rabbit_disk_queue, Key)
- end, ok, DeleteAcc),
- {ok, _State1} = lists:foldl(
- fun ({Q, MsgSeqIds}, {ok, State2}) ->
- remove_messages(Q, MsgSeqIds, true, State2)
- end, {ok, State}, RemoveAcc).
+prune_mnesia_flush_batch(DeleteAcc) ->
+ lists:foldl(fun (Key, ok) ->
+ mnesia:dirty_delete(rabbit_disk_queue, Key)
+ end, ok, DeleteAcc).
-prune_mnesia(State, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) ->
- {ok, State};
-prune_mnesia(State, '$end_of_table', DeleteAcc, RemoveAcc, _Len) ->
- prune_mnesia_flush_batch(State, DeleteAcc, RemoveAcc);
-prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) ->
+prune_mnesia(_State, '$end_of_table', Files, _DeleteAcc, 0) ->
+ {ok, Files};
+prune_mnesia(_State, '$end_of_table', Files, DeleteAcc, _Len) ->
+ ok = prune_mnesia_flush_batch(DeleteAcc),
+ {ok, Files};
+prune_mnesia(State, Key, Files, DeleteAcc, Len) ->
[#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
mnesia:dirty_read(rabbit_disk_queue, Key),
- {AccHeadLst, RemoveAcc1} =
- case RemoveAcc of
- [] -> {[], []};
- [{Q, Lst} | Acc2] -> {Lst, Acc2};
- [{_OldQ, []} | Acc2] -> {[], Acc2};
- Acc2 -> {[], Acc2}
- end,
- {DeleteAcc1, AccHeadLst1, Len1} =
+ {DeleteAcc1, Files1, Len1} =
case dets_ets_lookup(State, MsgId) of
[] ->
%% msg hasn't been found on disk, delete it
- {[{Q, SeqId} | DeleteAcc], AccHeadLst, Len + 1};
+ {[{Q, SeqId} | DeleteAcc], Files, Len + 1};
[{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] ->
%% msg is persistent, keep it
- {DeleteAcc, AccHeadLst, Len};
+ {DeleteAcc, Files, Len};
[{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] ->
%% msg is not persistent, delete it
- {DeleteAcc, [{MsgId, SeqId} | AccHeadLst], Len + 1}
+ Files2 = remove_message(MsgId, Files, State),
+ {[{Q, SeqId} | DeleteAcc], Files2, Len + 1}
end,
- RemoveAcc2 = [{Q, AccHeadLst1} | RemoveAcc1],
- {State1, Key1, DeleteAcc2, RemoveAcc3, Len2} =
+ {Key1, DeleteAcc2, Len2} =
if
Len1 >= ?BATCH_SIZE ->
%% We have no way of knowing how flushing the batch
@@ -1590,15 +1581,14 @@ prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) ->
%% so have no choice but to start again. Although this
%% will make recovery slower for large queues, we
%% guarantee we can start up in constant memory
- {ok, State2} =
- prune_mnesia_flush_batch(State, DeleteAcc1, RemoveAcc2),
+ ok = prune_mnesia_flush_batch(DeleteAcc1),
Key2 = mnesia:dirty_first(rabbit_disk_queue),
- {State2, Key2, [], [], 0};
+ {Key2, [], 0};
true ->
Key2 = mnesia:dirty_next(rabbit_disk_queue, Key),
- {State, Key2, DeleteAcc1, RemoveAcc2, Len1}
+ {Key2, DeleteAcc1, Len1}
end,
- prune_mnesia(State1, Key1, DeleteAcc2, RemoveAcc3, Len2).
+ prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2).
extract_sequence_numbers(State = #dqstate { sequences = Sequences }) ->
true =
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index dc3c9316e0..56d02f3305 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -147,10 +147,9 @@ table_definitions() ->
{attributes, record_info(fields, amqqueue)}]},
{rabbit_disk_queue,
[{record_name, dq_msg_loc},
- {type, set},
- {local_content, true},
{attributes, record_info(fields, dq_msg_loc)},
- {disc_copies, [node()]}]}
+ {disc_copies, [node()]},
+ {local_content, true}]}
].
replicated_table_definitions() ->