summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl33
-rw-r--r--src/rabbit_tests.erl8
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 9b0849c35f..0623d77d24 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -82,8 +82,8 @@ publish(Q, MsgId, Msg) when is_binary(Msg) ->
deliver(Q) ->
gen_server:call(?SERVER, {deliver, Q}, infinity).
-ack(Q, MsgIds) when is_list(MsgIds) ->
- gen_server:cast(?SERVER, {ack, Q, MsgIds}).
+ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
+ gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}).
tx_publish(MsgId, Msg) when is_binary(Msg) ->
gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}).
@@ -154,8 +154,8 @@ handle_call(clean_stop, _From, State) ->
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, MsgBody, State),
{noreply, State1};
-handle_cast({ack, Q, MsgIds}, State) ->
- {ok, State1} = internal_ack(Q, MsgIds, State),
+handle_cast({ack, Q, MsgSeqIds}, State) ->
+ {ok, State1} = internal_ack(Q, MsgSeqIds, State),
{noreply, State1};
handle_cast({tx_publish, MsgId, MsgBody}, State) ->
{ok, State1} = internal_tx_publish(MsgId, MsgBody, State),
@@ -238,7 +238,7 @@ internal_deliver(Q, State = #dqstate { msg_location = MsgLocation,
true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true})
end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- {ok, {MsgId, MsgBody, BodySize, Delivered},
+ {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
end
end.
@@ -249,13 +249,13 @@ internal_ack(Q, MsgIds, State) ->
%% Q is only needed if MnesiaDelete = true
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
-remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
- sequences = Sequences,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation,
+ sequences = Sequences,
+ file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
{Files, MaxSeqId}
- = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) ->
+ = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) ->
[{MsgId, RefCount, File, Offset, TotalSize}]
= dets:lookup(MsgLocation, MsgId),
Files3 =
@@ -275,19 +275,13 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
Files2
end,
{if MnesiaDelete ->
- [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }]
- = mnesia:dirty_match_object(rabbit_disk_queue,
- #dq_msg_loc { msg_id = MsgId,
- queue_and_seq_id = {Q, '_'},
- is_delivered = '_'
- }),
ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
lists:max([SeqId, MaxSeqId2]);
true ->
MaxSeqId2
end,
Files3}
- end, {sets:new(), 0}, MsgIds),
+ end, {sets:new(), 0}, MsgSeqIds),
true = if MnesiaDelete ->
[{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q),
if MaxSeqId > ReadSeqId ->
@@ -371,6 +365,9 @@ internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1}.
internal_tx_cancel(MsgIds, State) ->
+ % we don't need seq ids because we're not touching mnesia, because seqids were
+ % never assigned
+ MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)),
remove_messages(undefined, MsgIds, false, State).
%% ---- ROLLING OVER THE APPEND FILE ----
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1e66fe9a81..3eab352d22 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -705,8 +705,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
]]),
{Deliver, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List],
- rabbit_disk_queue:ack(Q, List),
+ [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List],
+ rabbit_disk_queue:ack(Q, SeqIds),
ok = rabbit_disk_queue:tx_commit(Q, [])
end || Q <- Qs]
end]]),
@@ -738,8 +738,8 @@ rdq_stress_gc(MsgCount) ->
end
end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)])))
++ lists:seq(1, (StartChunk - 1)),
- [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q),
- rabbit_disk_queue:ack(q, [N]),
+ [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q),
+ rabbit_disk_queue:ack(q, [SeqId]),
rabbit_disk_queue:tx_commit(q, [])
end || N <- AckList],
rdq_stop().