diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-20 12:46:57 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-20 12:46:57 +0100 |
| commit | e720187f953f7b926bafd46dbae75f60c4251a9d (patch) | |
| tree | 71999952fe4c316e0b4e2bf59bf6fac9e8f4f085 /src | |
| parent | d383423a040402a6c43ea7f0b092a0fdf6340447 (diff) | |
| download | rabbitmq-server-git-e720187f953f7b926bafd46dbae75f60c4251a9d.tar.gz | |
So the mnesia table now contains next_seq_id. This means we can cope with gaps appearing.
There are now two forms of publish, tx_commit and requeue. The first forms behave as before, with contiguous seq_ids. The second form (_with_seq(s)) take either a
n extra seq arg or in the case of a list, a tuple in which the desired seq_id is mentioned. This can always just be the atom 'next'. The invariant on these lists is that the seq_ids are always ascending. Thus requeue now effectively takes a mapping: [{{msgId,oldSeqId} -> newSeqId}].
On startup, because the sequence Ids are still private at that point, shuffle_up is still called as before, although now gaps can appear anywhere, not just in the delivered section. We collapse everything up simply because we don't know where the previous NextReadSeqId marker was. If we did know that then we'd know that beyond that point we had a well formed linked list, and only had to patch up before that. This further requires that even though we have a linked list, the seq_ids must be ascending.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 175 |
1 files changed, 128 insertions, 47 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 90713723b4..b2d086b27d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,8 +38,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, - tx_commit/3, tx_cancel/1, requeue/2, purge/1]). +-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, + requeue/2, requeue_with_seqs/2, purge/1]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -86,7 +87,8 @@ %% rabbit_disk_queue: this is an mnesia table which contains: %% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, %% is_delivered = IsDelivered, -%% msg_id = MsgId +%% msg_id = MsgId, +%% next_seq_id = SeqId %% } %% @@ -224,6 +226,7 @@ -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). +-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}}}). @@ -232,8 +235,10 @@ -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). +-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). --spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -251,6 +256,9 @@ start_link(FileSizeLimit) -> publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). +publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}). + deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). @@ -266,12 +274,19 @@ tx_publish(MsgId, Msg) when is_binary(Msg) -> tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). +tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds) + when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) -> + gen_server:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity). + tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). +requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) -> + gen_server:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}). + purge(Q) -> gen_server:call(?SERVER, {purge, Q}). @@ -362,7 +377,11 @@ handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> - {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), + PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(length(PubMsgIds), next)), + {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), + {reply, ok, State1}; +handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, PubSeqMsgIds, AckSeqIds, State), {reply, ok, State1}; handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), @@ -401,7 +420,10 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_on {reply, ok, State #dqstate { operation_mode = ram_disk }}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> - {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), + {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), + {noreply, State1}; +handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) -> + {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State), {noreply, State1}; handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), @@ -413,7 +435,11 @@ handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), {noreply, State1}; handle_cast({requeue, Q, MsgSeqIds}, State) -> - {ok, State1} = internal_requeue(Q, MsgSeqIds, State), + MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(length(MsgSeqIds), next)), + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), + {noreply, State1}; +handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), {noreply, State1}. handle_info(_Info, State) -> @@ -499,10 +525,11 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of [] -> {ok, empty, State}; [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = ReadSeqId2}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}), ok = if Delivered -> ok; true -> @@ -627,15 +654,35 @@ internal_tx_publish(MsgId, MsgBody, {ok, State} end. -internal_tx_commit(Q, PubMsgIds, AckSeqIds, +adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) -> + ExpectedSeqId; +adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId) -> + SuppliedSeqId; +adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) -> + ExpectedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), + ok = mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + SuppliedSeqId. + +%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) +internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, sequences = Sequences }) -> - {ReadSeqId, InitWriteSeqId} = - case ets:lookup(Sequences, Q) of - [] -> {0,0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + {PubList, PubAcc, ReadSeqId} = + case PubMsgSeqIds of + [] -> {[], undefined, undefined}; + [_|PubMsgSeqIdsTail] -> + {InitReadSeqId, InitWriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> {0,0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + end, + { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), + InitWriteSeqId, InitReadSeqId} end, {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( @@ -647,41 +694,55 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, %% order which _could_not_ have happened. {Sync2, WriteSeqId3} = lists:foldl( - fun (MsgId, {Acc, NextWriteSeqId}) -> + fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, + {Acc, ExpectedSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), + SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId), + NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; + true -> NextSeqId + end, + true = NextSeqId2 > SeqId2, ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = - {Q, NextWriteSeqId}, + {Q, SeqId2}, msg_id = MsgId, - is_delivered = false}, + is_delivered = false, + next_seq_id = NextSeqId2 + }, write), - {Acc or (CurName =:= File), NextWriteSeqId + 1} - end, {false, InitWriteSeqId}, PubMsgIds), + {Acc or (CurName =:= File), NextSeqId2} + end, {false, PubAcc}, PubList), + {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), {Sync2, WriteSeqId3, State3} end), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), - if Sync -> ok = file:sync(CurHdl); - true -> ok - end, + true = if PubList =:= [] -> true; + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}) + end, + ok = if Sync -> file:sync(CurHdl); + true -> ok + end, {ok, State2}. -internal_publish(Q, MsgId, MsgBody, State) -> +%% SeqId can be 'next' +internal_publish(Q, MsgId, SeqId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), - WriteSeqId = case ets:lookup(Sequences, Q) of - [] -> %% previously unseen queue - true = ets:insert_new(Sequences, {Q, 0, 1}), - 0; - [{Q, ReadSeqId, WriteSeqId2}] -> - true = ets:insert(Sequences, {Q, ReadSeqId, - WriteSeqId2 +1}), - WriteSeqId2 - end, + {ReadSeqId, WriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> %% previously unseen queue + {0, 0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> + {ReadSeqId2, WriteSeqId2} + end, + WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId), + WriteSeqId3Next = WriteSeqId3 + 1, + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}), ok = mnesia:dirty_write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, + next_seq_id = WriteSeqId3Next, is_delivered = false}), {ok, State1}. @@ -691,7 +752,10 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). -internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> +internal_requeue(_Q, [], State) -> + {ok, State}; +internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], + State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've %% already been delivered). We also know that the rows for these @@ -716,20 +780,30 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% the Q _must_ already exist [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foldl( - fun ({MsgId, SeqId}, NextWriteSeqId) -> + fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, + {_NextMsgSeqId, NextSeqIdTo}}, + ExpectedSeqIdTo) -> + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo), + NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; + true -> NextSeqIdTo + end, + true = NextSeqIdTo2 > SeqIdTo2, [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }}, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2}, + next_seq_id = NextSeqIdTo2 + }, write), - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), - NextWriteSeqId + 1 - end, WriteSeqId, MsgSeqIds) + mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write), + NextSeqIdTo2 + end, WriteSeqId, MsgSeqIdsZipped) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), {ok, State}. @@ -1136,11 +1210,12 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> GapInc = case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of [] -> 1; - [Obj = #dq_msg_loc { is_delivered = IsDelivered }] when IsDelivered - orelse (Gap =:= 0) -> + [Obj] -> if Gap =:= 0 -> ok; true -> mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }}, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }, + next_seq_id = SeqId + Gap + 1 + }, write), mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) end, @@ -1172,7 +1247,9 @@ load_messages(Left, [File|Files], (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> @@ -1215,7 +1292,9 @@ recover_crashed_compactions1(Files, TmpFile) -> (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) end, MsgIdsTmp), {ok, UncorruptedMessages} = @@ -1253,7 +1332,9 @@ recover_crashed_compactions1(Files, TmpFile) -> (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) end, MsgIds), %% The main file should be contiguous |
