summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl175
1 files changed, 128 insertions, 47 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 90713723b4..b2d086b27d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,8 +38,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2,
- tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
+-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
+ tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
+ requeue/2, requeue_with_seqs/2, purge/1]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -86,7 +87,8 @@
%% rabbit_disk_queue: this is an mnesia table which contains:
%% #dq_msg_loc { queue_and_seq_id = {Q, SeqId},
%% is_delivered = IsDelivered,
-%% msg_id = MsgId
+%% msg_id = MsgId,
+%% next_seq_id = SeqId
%% }
%%
@@ -224,6 +226,7 @@
-spec(start_link/1 :: (non_neg_integer()) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
+-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}}}).
@@ -232,8 +235,10 @@
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
+-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], [seq_id()]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
--spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
+-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
+-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -251,6 +256,9 @@ start_link(FileSizeLimit) ->
publish(Q, MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {publish, Q, MsgId, Msg}).
+publish_with_seq(Q, MsgId, SeqId, Msg) when is_binary(Msg) ->
+ gen_server:cast(?SERVER, {publish_with_seq, Q, MsgId, SeqId, Msg}).
+
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
@@ -266,12 +274,19 @@ tx_publish(MsgId, Msg) when is_binary(Msg) ->
tx_commit(Q, PubMsgIds, AckSeqIds) when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
gen_server:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
+tx_commit_with_seqs(Q, PubMsgSeqIds, AckSeqIds)
+ when is_list(PubMsgSeqIds) andalso is_list(AckSeqIds) ->
+ gen_server:call(?SERVER, {tx_commit_with_seqs, Q, PubMsgSeqIds, AckSeqIds}, infinity).
+
tx_cancel(MsgIds) when is_list(MsgIds) ->
gen_server:cast(?SERVER, {tx_cancel, MsgIds}).
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) ->
+ gen_server:cast(?SERVER, {requeue_with_seqs, Q, MsgSeqSeqIds}).
+
purge(Q) ->
gen_server:call(?SERVER, {purge, Q}).
@@ -362,7 +377,11 @@ handle_call({phantom_deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, false, State),
{reply, Result, State1};
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
- {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
+ PubMsgSeqIds = lists:zip(PubMsgIds, lists:duplicate(length(PubMsgIds), next)),
+ {ok, State1} = internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State),
+ {reply, ok, State1};
+handle_call({tx_commit_with_seqs, Q, PubSeqMsgIds, AckSeqIds}, _From, State) ->
+ {ok, State1} = internal_tx_commit(Q, PubSeqMsgIds, AckSeqIds, State),
{reply, ok, State1};
handle_call({purge, Q}, _From, State) ->
{ok, Count, State1} = internal_purge(Q, State),
@@ -401,7 +420,10 @@ handle_call(to_ram_disk_mode, _From, State = #dqstate { operation_mode = disk_on
{reply, ok, State #dqstate { operation_mode = ram_disk }}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
- {ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
+ {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
+ {noreply, State1};
+handle_cast({publish_with_seq, Q, MsgId, SeqId, MsgBody}, State) ->
+ {ok, State1} = internal_publish(Q, MsgId, SeqId, MsgBody, State),
{noreply, State1};
handle_cast({ack, Q, MsgSeqIds}, State) ->
{ok, State1} = internal_ack(Q, MsgSeqIds, State),
@@ -413,7 +435,11 @@ handle_cast({tx_cancel, MsgIds}, State) ->
{ok, State1} = internal_tx_cancel(MsgIds, State),
{noreply, State1};
handle_cast({requeue, Q, MsgSeqIds}, State) ->
- {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
+ MsgSeqSeqIds = lists:zip(MsgSeqIds, lists:duplicate(length(MsgSeqIds), next)),
+ {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
+ {noreply, State1};
+handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
+ {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
{noreply, State1}.
handle_info(_Info, State) ->
@@ -499,10 +525,11 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
case mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}) of
[] -> {ok, empty, State};
[Obj =
- #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId}] ->
+ #dq_msg_loc {is_delivered = Delivered, msg_id = MsgId,
+ next_seq_id = ReadSeqId2}] ->
[{MsgId, _RefCount, File, Offset, TotalSize}] =
dets_ets_lookup(State, MsgId),
- true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
+ true = ets:insert(Sequences, {Q, ReadSeqId2, WriteSeqId}),
ok =
if Delivered -> ok;
true ->
@@ -627,15 +654,35 @@ internal_tx_publish(MsgId, MsgBody,
{ok, State}
end.
-internal_tx_commit(Q, PubMsgIds, AckSeqIds,
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, next) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId) ->
+ SuppliedSeqId;
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId) when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
+ ok = mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
+ SuppliedSeqId.
+
+%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
+internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
State = #dqstate { current_file_handle = CurHdl,
current_file_name = CurName,
sequences = Sequences
}) ->
- {ReadSeqId, InitWriteSeqId} =
- case ets:lookup(Sequences, Q) of
- [] -> {0,0};
- [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ {PubList, PubAcc, ReadSeqId} =
+ case PubMsgSeqIds of
+ [] -> {[], undefined, undefined};
+ [_|PubMsgSeqIdsTail] ->
+ {InitReadSeqId, InitWriteSeqId} =
+ case ets:lookup(Sequences, Q) of
+ [] -> {0,0};
+ [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
+ end,
+ { lists:zip(PubMsgSeqIds, (PubMsgSeqIdsTail ++ [{next, next}])),
+ InitWriteSeqId, InitReadSeqId}
end,
{atomic, {Sync, WriteSeqId, State2}} =
mnesia:transaction(
@@ -647,41 +694,55 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds,
%% order which _could_not_ have happened.
{Sync2, WriteSeqId3} =
lists:foldl(
- fun (MsgId, {Acc, NextWriteSeqId}) ->
+ fun ({{MsgId, SeqId}, {_NextMsgId, NextSeqId}},
+ {Acc, ExpectedSeqId}) ->
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
dets_ets_lookup(State, MsgId),
+ SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId),
+ NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1;
+ true -> NextSeqId
+ end,
+ true = NextSeqId2 > SeqId2,
ok = mnesia:write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
- {Q, NextWriteSeqId},
+ {Q, SeqId2},
msg_id = MsgId,
- is_delivered = false},
+ is_delivered = false,
+ next_seq_id = NextSeqId2
+ },
write),
- {Acc or (CurName =:= File), NextWriteSeqId + 1}
- end, {false, InitWriteSeqId}, PubMsgIds),
+ {Acc or (CurName =:= File), NextSeqId2}
+ end, {false, PubAcc}, PubList),
+
{ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
{Sync2, WriteSeqId3, State3}
end),
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
- if Sync -> ok = file:sync(CurHdl);
- true -> ok
- end,
+ true = if PubList =:= [] -> true;
+ true -> ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId})
+ end,
+ ok = if Sync -> file:sync(CurHdl);
+ true -> ok
+ end,
{ok, State2}.
-internal_publish(Q, MsgId, MsgBody, State) ->
+%% SeqId can be 'next'
+internal_publish(Q, MsgId, SeqId, MsgBody, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
internal_tx_publish(MsgId, MsgBody, State),
- WriteSeqId = case ets:lookup(Sequences, Q) of
- [] -> %% previously unseen queue
- true = ets:insert_new(Sequences, {Q, 0, 1}),
- 0;
- [{Q, ReadSeqId, WriteSeqId2}] ->
- true = ets:insert(Sequences, {Q, ReadSeqId,
- WriteSeqId2 +1}),
- WriteSeqId2
- end,
+ {ReadSeqId, WriteSeqId} =
+ case ets:lookup(Sequences, Q) of
+ [] -> %% previously unseen queue
+ {0, 0};
+ [{Q, ReadSeqId2, WriteSeqId2}] ->
+ {ReadSeqId2, WriteSeqId2}
+ end,
+ WriteSeqId3 = adjust_last_msg_seq_id(Q, WriteSeqId, SeqId),
+ WriteSeqId3Next = WriteSeqId3 + 1,
+ true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId3Next}),
ok = mnesia:dirty_write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
+ #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId3},
msg_id = MsgId,
+ next_seq_id = WriteSeqId3Next,
is_delivered = false}),
{ok, State1}.
@@ -691,7 +752,10 @@ internal_tx_cancel(MsgIds, State) ->
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
-internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
+internal_requeue(_Q, [], State) ->
+ {ok, State};
+internal_requeue(Q, MsgSeqIds = [_|MsgSeqIdsTail],
+ State = #dqstate { sequences = Sequences }) ->
%% We know that every seq_id in here is less than the ReadSeqId
%% you'll get if you look up this queue in Sequences (i.e. they've
%% already been delivered). We also know that the rows for these
@@ -716,20 +780,30 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
%% the Q _must_ already exist
[{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
+ MsgSeqIdsZipped = lists:zip(MsgSeqIds, MsgSeqIdsTail ++ [{next, next}]),
{atomic, WriteSeqId2} =
mnesia:transaction(
fun() ->
ok = mnesia:write_lock_table(rabbit_disk_queue),
lists:foldl(
- fun ({MsgId, SeqId}, NextWriteSeqId) ->
+ fun ({{{MsgId, SeqIdOrig}, SeqIdTo},
+ {_NextMsgSeqId, NextSeqIdTo}},
+ ExpectedSeqIdTo) ->
+ SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo),
+ NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1;
+ true -> NextSeqIdTo
+ end,
+ true = NextSeqIdTo2 > SeqIdTo2,
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, NextWriteSeqId }},
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqIdTo2},
+ next_seq_id = NextSeqIdTo2
+ },
write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- NextWriteSeqId + 1
- end, WriteSeqId, MsgSeqIds)
+ mnesia:delete(rabbit_disk_queue, {Q, SeqIdOrig}, write),
+ NextSeqIdTo2
+ end, WriteSeqId, MsgSeqIdsZipped)
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}),
{ok, State}.
@@ -1136,11 +1210,12 @@ shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
GapInc =
case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
[] -> 1;
- [Obj = #dq_msg_loc { is_delivered = IsDelivered }] when IsDelivered
- orelse (Gap =:= 0) ->
+ [Obj] ->
if Gap =:= 0 -> ok;
true -> mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap }},
+ Obj #dq_msg_loc { queue_and_seq_id = {Q, SeqId + Gap },
+ next_seq_id = SeqId + Gap + 1
+ },
write),
mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
end,
@@ -1172,7 +1247,9 @@ load_messages(Left, [File|Files],
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'},
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
msg_id)) of
0 -> {VMAcc, VTSAcc};
RefCount ->
@@ -1215,7 +1292,9 @@ recover_crashed_compactions1(Files, TmpFile) ->
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'},
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
msg_id))
end, MsgIdsTmp),
{ok, UncorruptedMessages} =
@@ -1253,7 +1332,9 @@ recover_crashed_compactions1(Files, TmpFile) ->
(rabbit_disk_queue,
#dq_msg_loc { msg_id = MsgId,
queue_and_seq_id = '_',
- is_delivered = '_'},
+ is_delivered = '_',
+ next_seq_id = '_'
+ },
msg_id))
end, MsgIds),
%% The main file should be contiguous