diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-23 18:51:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-23 18:51:02 +0100 |
| commit | 2940ef9035d139458c1f6a184ebd11f8cbc437a5 (patch) | |
| tree | 9cbc1748a15bef6ddef34f899379e8ea4760fd6d /src | |
| parent | 2057297e947a5f9995308082a00b7fd79084b03d (diff) | |
| download | rabbitmq-server-git-2940ef9035d139458c1f6a184ebd11f8cbc437a5.tar.gz | |
preemptive refactoring and other assorted changes
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 114 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 5 |
2 files changed, 54 insertions, 65 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c0ed7e62b1..04c8a82579 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -976,39 +976,11 @@ internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). %% Q is only needed if MnesiaDelete /= false -remove_messages(Q, MsgSeqIds, MnesiaDelete, - State = #dqstate { file_summary = FileSummary, - current_file_name = CurName - }) -> +remove_messages(Q, MsgSeqIds, MnesiaDelete, State) -> Files = lists:foldl( fun ({MsgId, SeqId}, Files1) -> - [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = - dets_ets_lookup(State, MsgId), - Files2 = - case RefCount of - 1 -> - ok = dets_ets_delete(State, MsgId), - ok = remove_cache_entry(MsgId, State), - [{File, ValidTotalSize, ContiguousTop, - Left, Right}] = ets:lookup(FileSummary, File), - ContiguousTop1 = - lists:min([ContiguousTop, Offset]), - true = - ets:insert(FileSummary, - {File, (ValidTotalSize-TotalSize- - ?FILE_PACKING_ADJUSTMENT), - ContiguousTop1, Left, Right}), - if CurName =:= File -> Files1; - true -> sets:add_element(File, Files1) - end; - _ when 1 < RefCount -> - ok = decrement_cache(MsgId, State), - ok = dets_ets_insert( - State, {MsgId, RefCount - 1, File, Offset, - TotalSize, IsPersistent}), - Files1 - end, + Files2 = remove_message(MsgId, Files1, State), ok = case MnesiaDelete of true -> mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); @@ -1019,6 +991,34 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, State1 = compact(Files, State), {ok, State1}. +remove_message(MsgId, Files, + State = #dqstate { file_summary = FileSummary, + current_file_name = CurName + }) -> + [{MsgId, RefCount, File, Offset, TotalSize, IsPersistent}] = + dets_ets_lookup(State, MsgId), + case RefCount of + 1 -> + ok = dets_ets_delete(State, MsgId), + ok = remove_cache_entry(MsgId, State), + [{File, ValidTotalSize, ContiguousTop, Left, Right}] = + ets:lookup(FileSummary, File), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + true = + ets:insert(FileSummary, + {File, + (ValidTotalSize-TotalSize-?FILE_PACKING_ADJUSTMENT), + ContiguousTop1, Left, Right}), + if CurName =:= File -> Files; + true -> sets:add_element(File, Files) + end; + _ when 1 < RefCount -> + ok = decrement_cache(MsgId, State), + ok = dets_ets_insert(State, {MsgId, RefCount - 1, File, Offset, + TotalSize, IsPersistent}), + Files + end. + internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, guid = MsgId }, State = #dqstate { current_file_handle = CurHdl, @@ -1541,48 +1541,39 @@ load_from_disk(State) -> %% Finally, check there is nothing in mnesia which we haven't %% loaded Key = mnesia:dirty_first(rabbit_disk_queue), - {ok, State2} = prune_mnesia(State1, Key, [], [], 0), + {ok, AlteredFiles} = prune_mnesia(State1, Key, sets:new(), [], 0), + State2 = compact(AlteredFiles, State1), State3 = extract_sequence_numbers(State2), ok = del_index(), {ok, State3}. -prune_mnesia_flush_batch(State, DeleteAcc, RemoveAcc) -> - ok = lists:foldl(fun (Key, ok) -> - mnesia:dirty_delete(rabbit_disk_queue, Key) - end, ok, DeleteAcc), - {ok, _State1} = lists:foldl( - fun ({Q, MsgSeqIds}, {ok, State2}) -> - remove_messages(Q, MsgSeqIds, true, State2) - end, {ok, State}, RemoveAcc). +prune_mnesia_flush_batch(DeleteAcc) -> + lists:foldl(fun (Key, ok) -> + mnesia:dirty_delete(rabbit_disk_queue, Key) + end, ok, DeleteAcc). -prune_mnesia(State, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) -> - {ok, State}; -prune_mnesia(State, '$end_of_table', DeleteAcc, RemoveAcc, _Len) -> - prune_mnesia_flush_batch(State, DeleteAcc, RemoveAcc); -prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) -> +prune_mnesia(_State, '$end_of_table', Files, _DeleteAcc, 0) -> + {ok, Files}; +prune_mnesia(_State, '$end_of_table', Files, DeleteAcc, _Len) -> + ok = prune_mnesia_flush_batch(DeleteAcc), + {ok, Files}; +prune_mnesia(State, Key, Files, DeleteAcc, Len) -> [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = mnesia:dirty_read(rabbit_disk_queue, Key), - {AccHeadLst, RemoveAcc1} = - case RemoveAcc of - [] -> {[], []}; - [{Q, Lst} | Acc2] -> {Lst, Acc2}; - [{_OldQ, []} | Acc2] -> {[], Acc2}; - Acc2 -> {[], Acc2} - end, - {DeleteAcc1, AccHeadLst1, Len1} = + {DeleteAcc1, Files1, Len1} = case dets_ets_lookup(State, MsgId) of [] -> %% msg hasn't been found on disk, delete it - {[{Q, SeqId} | DeleteAcc], AccHeadLst, Len + 1}; + {[{Q, SeqId} | DeleteAcc], Files, Len + 1}; [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] -> %% msg is persistent, keep it - {DeleteAcc, AccHeadLst, Len}; + {DeleteAcc, Files, Len}; [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] -> %% msg is not persistent, delete it - {DeleteAcc, [{MsgId, SeqId} | AccHeadLst], Len + 1} + Files2 = remove_message(MsgId, Files, State), + {[{Q, SeqId} | DeleteAcc], Files2, Len + 1} end, - RemoveAcc2 = [{Q, AccHeadLst1} | RemoveAcc1], - {State1, Key1, DeleteAcc2, RemoveAcc3, Len2} = + {Key1, DeleteAcc2, Len2} = if Len1 >= ?BATCH_SIZE -> %% We have no way of knowing how flushing the batch @@ -1590,15 +1581,14 @@ prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) -> %% so have no choice but to start again. Although this %% will make recovery slower for large queues, we %% guarantee we can start up in constant memory - {ok, State2} = - prune_mnesia_flush_batch(State, DeleteAcc1, RemoveAcc2), + ok = prune_mnesia_flush_batch(DeleteAcc1), Key2 = mnesia:dirty_first(rabbit_disk_queue), - {State2, Key2, [], [], 0}; + {Key2, [], 0}; true -> Key2 = mnesia:dirty_next(rabbit_disk_queue, Key), - {State, Key2, DeleteAcc1, RemoveAcc2, Len1} + {Key2, DeleteAcc1, Len1} end, - prune_mnesia(State1, Key1, DeleteAcc2, RemoveAcc3, Len2). + prune_mnesia(State, Key1, Files1, DeleteAcc2, Len2). extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> true = diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index dc3c9316e0..56d02f3305 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -147,10 +147,9 @@ table_definitions() -> {attributes, record_info(fields, amqqueue)}]}, {rabbit_disk_queue, [{record_name, dq_msg_loc}, - {type, set}, - {local_content, true}, {attributes, record_info(fields, dq_msg_loc)}, - {disc_copies, [node()]}]} + {disc_copies, [node()]}, + {local_content, true}]} ]. replicated_table_definitions() -> |
