diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 175 |
1 files changed, 128 insertions, 47 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 90713723b4..b2d086b27d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,8 +38,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, - tx_commit/3, tx_cancel/1, requeue/2, purge/1]). +-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, + tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, + requeue/2, requeue_with_seqs/2, purge/1]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -86,7 +87,8 @@ %% rabbit_disk_queue: this is an mnesia table which contains: %% #dq_msg_loc { queue_and_seq_id = {Q, SeqId}, %% is_delivered = IsDelivered, -%% msg_id = MsgId +%% msg_id = MsgId, +%% next_seq_id = SeqId %% } %% @@ -224,6 +226,7 @@ -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). +-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}}}). @@ -232,8 +235,10 @@ -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). +-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). --spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). +-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -251,6 +256,9 @@ start_link(FileSizeLimit) -> publish(Q, MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}). +publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) -> + gen_server:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}). + deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). @@ -266,12 +274,19 @@ tx_publish(MsgId, Msg) when is_binary(Msg) -> tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). +tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds) + when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) -> + gen_server:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity). + tx_cancel(MsgIds) when is_list(MsgIds) -> gen_server:cast(?SERVER, {tx_cancel, MsgIds}). requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). +requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) -> + gen_server:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}). + purge(Q) -> gen_server:call(?SERVER, {purge, Q}). @@ -362,7 +377,11 @@ handle_call({phantom_deliver, Q}, _From, State) -> {ok, Result, State1} = internal_deliver(Q, false, State), {reply, Result, State1}; handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> - {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), + PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(length(PubMsgIds), next)), + {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State), + {reply, ok, State1}; +handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) -> + {ok, State1} = internal_tx_commit(Q, PubSeqMsgIds, AckSeqIds, State), {reply, ok, State1}; handle_call({purge, Q}, _From, State) -> {ok, Count, State1} = internal_purge(Q, State), @@ -401,7 +420,10 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_on {reply, ok, State #dqstate { operation_mode = ram_disk }}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> - {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), + {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), + {noreply, State1}; +handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) -> + {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State), {noreply, State1}; handle_cast({ack, Q, MsgSeqIds}, State) -> {ok, State1} = internal_ack(Q, MsgSeqIds, State), @@ -413,7 +435,11 @@ handle_cast({tx_cancel, MsgIds}, State) -> {ok, State1} = internal_tx_cancel(MsgIds, State), {noreply, State1}; handle_cast({requeue, Q, MsgSeqIds}, State) -> - {ok, State1} = internal_requeue(Q, MsgSeqIds, State), + MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(length(MsgSeqIds), next)), + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), + {noreply, State1}; +handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> + {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), {noreply, State1}. handle_info(_Info, State) -> @@ -499,10 +525,11 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of [] -> {ok, empty, State}; [Obj = - #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] -> + #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId, + next_seq_id = ReadSeqId2}] -> [{MsgId, _RefCount, File, Offset, TotalSize}] = dets_ets_lookup(State, MsgId), - true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), + true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}), ok = if Delivered -> ok; true -> @@ -627,15 +654,35 @@ internal_tx_publish(MsgId, MsgBody, {ok, State} end. -internal_tx_commit(Q, PubMsgIds, AckSeqIds, +adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) -> + ExpectedSeqId; +adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId) -> + SuppliedSeqId; +adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) -> + ExpectedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), + ok = mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + SuppliedSeqId. + +%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) +internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State = #dqstate { current_file_handle = CurHdl, current_file_name = CurName, sequences = Sequences }) -> - {ReadSeqId, InitWriteSeqId} = - case ets:lookup(Sequences, Q) of - [] -> {0,0}; - [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + {PubList, PubAcc, ReadSeqId} = + case PubMsgSeqIds of + [] -> {[], undefined, undefined}; + [_|PubMsgSeqIdsTail] -> + {InitReadSeqId, InitWriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> {0,0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} + end, + { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])), + InitWriteSeqId, InitReadSeqId} end, {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( @@ -647,41 +694,55 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, %% order which _could_not_ have happened. {Sync2, WriteSeqId3} = lists:foldl( - fun (MsgId, {Acc, NextWriteSeqId}) -> + fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}}, + {Acc, ExpectedSeqId}) -> [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), + SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId), + NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; + true -> NextSeqId + end, + true = NextSeqId2 > SeqId2, ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = - {Q, NextWriteSeqId}, + {Q, SeqId2}, msg_id = MsgId, - is_delivered = false}, + is_delivered = false, + next_seq_id = NextSeqId2 + }, write), - {Acc or (CurName =:= File), NextWriteSeqId + 1} - end, {false, InitWriteSeqId}, PubMsgIds), + {Acc or (CurName =:= File), NextSeqId2} + end, {false, PubAcc}, PubList), + {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), {Sync2, WriteSeqId3, State3} end), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), - if Sync -> ok = file:sync(CurHdl); - true -> ok - end, + true = if PubList =:= [] -> true; + true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}) + end, + ok = if Sync -> file:sync(CurHdl); + true -> ok + end, {ok, State2}. -internal_publish(Q, MsgId, MsgBody, State) -> +%% SeqId can be 'next' +internal_publish(Q, MsgId, SeqId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = internal_tx_publish(MsgId, MsgBody, State), - WriteSeqId = case ets:lookup(Sequences, Q) of - [] -> %% previously unseen queue - true = ets:insert_new(Sequences, {Q, 0, 1}), - 0; - [{Q, ReadSeqId, WriteSeqId2}] -> - true = ets:insert(Sequences, {Q, ReadSeqId, - WriteSeqId2 +1}), - WriteSeqId2 - end, + {ReadSeqId, WriteSeqId} = + case ets:lookup(Sequences, Q) of + [] -> %% previously unseen queue + {0, 0}; + [{Q, ReadSeqId2, WriteSeqId2}] -> + {ReadSeqId2, WriteSeqId2} + end, + WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId), + WriteSeqId3Next = WriteSeqId3 + 1, + true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}), ok = mnesia:dirty_write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, + #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3}, msg_id = MsgId, + next_seq_id = WriteSeqId3Next, is_delivered = false}), {ok, State1}. @@ -691,7 +752,10 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). -internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> +internal_requeue(_Q, [], State) -> + {ok, State}; +internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail], + State = #dqstate { sequences = Sequences }) -> %% We know that every seq_id in here is less than the ReadSeqId %% you'll get if you look up this queue in Sequences (i.e. they've %% already been delivered). We also know that the rows for these @@ -716,20 +780,30 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> %% the Q _must_ already exist [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), + MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]), {atomic, WriteSeqId2} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), lists:foldl( - fun ({MsgId, SeqId}, NextWriteSeqId) -> + fun ({{{MsgId, SeqIdOrig}, SeqIdTo}, + {_NextMsgSeqId, NextSeqIdTo}}, + ExpectedSeqIdTo) -> + SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo), + NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; + true -> NextSeqIdTo + end, + true = NextSeqIdTo2 > SeqIdTo2, [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }}, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2}, + next_seq_id = NextSeqIdTo2 + }, write), - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), - NextWriteSeqId + 1 - end, WriteSeqId, MsgSeqIds) + mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write), + NextSeqIdTo2 + end, WriteSeqId, MsgSeqIdsZipped) end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), {ok, State}. @@ -1136,11 +1210,12 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) -> GapInc = case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of [] -> 1; - [Obj = #dq_msg_loc { is_delivered = IsDelivered }] when IsDelivered - orelse (Gap =:= 0) -> + [Obj] -> if Gap =:= 0 -> ok; true -> mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }}, + Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }, + next_seq_id = SeqId + Gap + 1 + }, write), mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) end, @@ -1172,7 +1247,9 @@ load_messages(Left, [File|Files], (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) of 0 -> {VMAcc, VTSAcc}; RefCount -> @@ -1215,7 +1292,9 @@ recover_crashed_compactions1(Files, TmpFile) -> (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) end, MsgIdsTmp), {ok, UncorruptedMessages} = @@ -1253,7 +1332,9 @@ recover_crashed_compactions1(Files, TmpFile) -> (rabbit_disk_queue, #dq_msg_loc { msg_id = MsgId, queue_and_seq_id = '_', - is_delivered = '_'}, + is_delivered = '_', + next_seq_id = '_' + }, msg_id)) end, MsgIds), %% The main file should be contiguous |
