diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-23 17:00:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-23 17:00:22 +0100 |
| commit | 9989de598228781c3a979d2932bb5fa3cbf5a919 (patch) | |
| tree | 8f71281f17076ddee674fb1c232d07f0e2755b7f /src | |
| parent | cb4f27280ecc3c578b5098e4e8fb6e5c92e842a0 (diff) | |
| download | rabbitmq-server-git-9989de598228781c3a979d2932bb5fa3cbf5a919.tar.gz | |
Sorted out transactions within the disk_queue, ensuring that if they do restart that other data structures cannot be left partially updated, and can continue successfully, in particular, manipulation of ets tables within mnesia transactions.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 166 |
1 files changed, 95 insertions, 71 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 344aff9181..835043c3c1 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -1012,8 +1012,6 @@ remove_messages(Q, MsgSeqIds, MnesiaDelete, ok = case MnesiaDelete of true -> mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}); - txn -> mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write); _ -> ok end, Files2 @@ -1542,67 +1540,92 @@ load_from_disk(State) -> State1 = load_messages(undefined, Files, State), %% Finally, check there is nothing in mnesia which we haven't %% loaded - State2 = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - {State6, FinalQ, MsgSeqIds2, _Len} = - mnesia:foldl( - fun (#dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = {Q, SeqId} }, - {State3, OldQ, MsgSeqIds, Len}) -> - {State4, MsgSeqIds1, Len1} = - case {OldQ == Q, MsgSeqIds} of - {true, _} when Len < ?BATCH_SIZE -> - {State3, MsgSeqIds, Len}; - {false, []} -> {State3, MsgSeqIds, Len}; - {_, _} -> - {ok, State5} = - remove_messages(Q, MsgSeqIds, - txn, State3), - {State5, [], 0} - end, - case dets_ets_lookup(State4, MsgId) of - [] -> ok = mnesia:delete(rabbit_disk_queue, - {Q, SeqId}, write), - {State4, Q, MsgSeqIds1, Len1}; - [{MsgId, _RefCount, _File, _Offset, - _TotalSize, true}] -> - {State4, Q, MsgSeqIds1, Len1}; - [{MsgId, _RefCount, _File, _Offset, - _TotalSize, false}] -> - {State4, Q, - [{MsgId, SeqId} | MsgSeqIds1], Len1+1} - end - end, {State1, undefined, [], 0}, rabbit_disk_queue), - {ok, State7} = - remove_messages(FinalQ, MsgSeqIds2, txn, State6), - State7 - end), - State8 = extract_sequence_numbers(State2), + Key = mnesia:dirty_first(rabbit_disk_queue), + {ok, State2} = prune_mnesia(State1, Key, [], [], 0), + State3 = extract_sequence_numbers(State2), ok = del_index(), - {ok, State8}. + {ok, State3}. + +prune_mnesia(State, DeleteAcc, RemoveAcc) -> + ok = lists:foldl(fun (Key, ok) -> + mnesia:dirty_delete(rabbit_disk_queue, Key) + end, ok, DeleteAcc), + {ok, _State1} = lists:foldl( + fun ({Q, MsgSeqIds}, {ok, State2}) -> + remove_messages(Q, MsgSeqIds, true, State2) + end, {ok, State}, RemoveAcc). + +prune_mnesia(State, '$end_of_table', _DeleteAcc, _RemoveAcc, 0) -> + {ok, State}; +prune_mnesia(State, '$end_of_table', DeleteAcc, RemoveAcc, _Len) -> + prune_mnesia(State, DeleteAcc, RemoveAcc); +prune_mnesia(State, Key, DeleteAcc, RemoveAcc, Len) -> + [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = + mnesia:dirty_read(rabbit_disk_queue, Key), + {AccHeadLst, RemoveAcc1} = + case RemoveAcc of + [] -> {[], []}; + [{Q, Lst} | Acc2] -> {Lst, Acc2}; + [{_OldQ, []} | Acc2] -> {[], Acc2}; + Acc2 -> {[], Acc2} + end, + {DeleteAcc1, AccHeadLst1, Len1} = + case dets_ets_lookup(State, MsgId) of + [] -> + %% msg hasn't been found on disk, delete it + {[{Q, SeqId} | DeleteAcc], AccHeadLst, Len + 1}; + [{MsgId, _RefCount, _File, _Offset, _TotalSize, true}] -> + %% msg is persistent, keep it + {DeleteAcc, AccHeadLst, Len}; + [{MsgId, _RefCount, _File, _Offset, _TotalSize, false}] -> + %% msg is not persistent, delete it + {DeleteAcc, [{MsgId, SeqId} | AccHeadLst], Len + 1} + end, + RemoveAcc2 = [{Q, AccHeadLst1} | RemoveAcc1], + {State1, Key1, DeleteAcc2, RemoveAcc3, Len2} = + if + Len1 >= ?BATCH_SIZE -> + %% We have no way of knowing how flushing the batch + %% will affect ordering of records within the table, + %% so have no choice but to start again. Although this + %% will make recovery slower for large queues, we + %% guarantee we can start up in constant memory + {ok, State2} = prune_mnesia(State, DeleteAcc1, RemoveAcc2), + Key2 = mnesia:dirty_first(rabbit_disk_queue), + {State2, Key2, [], [], 0}; + true -> + Key2 = mnesia:dirty_next(rabbit_disk_queue, Key), + {State, Key2, DeleteAcc1, RemoveAcc2, Len1} + end, + prune_mnesia(State1, Key1, DeleteAcc2, RemoveAcc3, Len2). extract_sequence_numbers(State = #dqstate { sequences = Sequences }) -> - true = rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> - NextWrite = SeqId + 1, - case ets:lookup(Sequences, Q) of - [] -> ets:insert_new(Sequences, - {Q, SeqId, NextWrite}); - [Orig = {Q, Read, Write}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite])}, - case Orig == Repl of - true -> true; - false -> ets:insert(Sequences, Repl) - end - end - end, true, rabbit_disk_queue) - end), + true = + rabbit_misc:execute_mnesia_transaction( + %% the ets manipulation within this transaction is + %% idempotent, in particular we're only reading from mnesia, + %% and combining what we read with what we find in + %% ets. Should the transaction restart, the non-rolledback + %% data in ets can still be successfully combined with what + %% we find in mnesia + fun() -> + ok = mnesia:read_lock_table(rabbit_disk_queue), + mnesia:foldl( + fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> + NextWrite = SeqId + 1, + case ets:lookup(Sequences, Q) of + [] -> ets:insert_new(Sequences, + {Q, SeqId, NextWrite}); + [Orig = {Q, Read, Write}] -> + Repl = {Q, lists:min([Read, SeqId]), + lists:max([Write, NextWrite])}, + case Orig == Repl of + true -> true; + false -> ets:insert(Sequences, Repl) + end + end + end, true, rabbit_disk_queue) + end), ok = remove_gaps_in_sequences(State), State. @@ -1616,17 +1639,18 @@ remove_gaps_in_sequences(#dqstate { sequences = Sequences }) -> %% we could shuffle downwards. However, I think there's greater %% likelihood of gaps being at the bottom rather than the top of %% the queue, so shuffling up should be the better bet. - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foreach( - fun ({Q, ReadSeqId, WriteSeqId}) -> - Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - ReadSeqId1 = ReadSeqId + Gap, - true = ets:insert(Sequences, - {Q, ReadSeqId1, WriteSeqId}) - end, ets:match_object(Sequences, '_')) - end), + QueueBoundaries = + rabbit_misc:execute_mnesia_transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + lists:foldl( + fun ({Q, ReadSeqId, WriteSeqId}, Acc) -> + Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), + [{Q, ReadSeqId + Gap, WriteSeqId} | Acc] + end, [], ets:match_object(Sequences, '_')) + end), + true = lists:foldl(fun (Obj, true) -> ets:insert(Sequences, Obj) end, + true, QueueBoundaries), ok. shuffle_up(_Q, SeqId, SeqId, Gap) -> |
