diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 8 |
2 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 9b0849c35f..0623d77d24 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -82,8 +82,8 @@ publish(Q, MsgId, Msg) when is_binary(Msg) -> deliver(Q) -> gen_server:call(?SERVER, {deliver, Q}, infinity). -ack(Q, MsgIds) when is_list(MsgIds) -> - gen_server:cast(?SERVER, {ack, Q, MsgIds}). +ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> + gen_server:cast(?SERVER, {ack, Q, MsgSeqIds}). tx_publish(MsgId, Msg) when is_binary(Msg) -> gen_server:cast(?SERVER, {tx_publish, MsgId, Msg}). @@ -154,8 +154,8 @@ handle_call(clean_stop, _From, State) -> handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, MsgBody, State), {noreply, State1}; -handle_cast({ack, Q, MsgIds}, State) -> - {ok, State1} = internal_ack(Q, MsgIds, State), +handle_cast({ack, Q, MsgSeqIds}, State) -> + {ok, State1} = internal_ack(Q, MsgSeqIds, State), {noreply, State1}; handle_cast({tx_publish, MsgId, MsgBody}, State) -> {ok, State1} = internal_tx_publish(MsgId, MsgBody, State), @@ -238,7 +238,7 @@ internal_deliver(Q, State = #dqstate { msg_location = MsgLocation, true -> ok = mnesia:dirty_write(rabbit_disk_queue, Obj #dq_msg_loc {is_delivered = true}) end, true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), - {ok, {MsgId, MsgBody, BodySize, Delivered}, + {ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}}, State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }} end end. @@ -249,13 +249,13 @@ internal_ack(Q, MsgIds, State) -> %% Q is only needed if MnesiaDelete = true %% called from tx_cancel with MnesiaDelete = false %% called from ack with MnesiaDelete = true -remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, - sequences = Sequences, - file_summary = FileSummary, - current_file_name = CurName - }) -> +remove_messages(Q, MsgSeqIds, MnesiaDelete, State = # dqstate { msg_location = MsgLocation, + sequences = Sequences, + file_summary = FileSummary, + current_file_name = CurName + }) -> {Files, MaxSeqId} - = lists:foldl(fun (MsgId, {Files2, MaxSeqId2}) -> + = lists:foldl(fun ({MsgId, SeqId}, {Files2, MaxSeqId2}) -> [{MsgId, RefCount, File, Offset, TotalSize}] = dets:lookup(MsgLocation, MsgId), Files3 = @@ -275,19 +275,13 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL Files2 end, {if MnesiaDelete -> - [#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }] - = mnesia:dirty_match_object(rabbit_disk_queue, - #dq_msg_loc { msg_id = MsgId, - queue_and_seq_id = {Q, '_'}, - is_delivered = '_' - }), ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), lists:max([SeqId, MaxSeqId2]); true -> MaxSeqId2 end, Files3} - end, {sets:new(), 0}, MsgIds), + end, {sets:new(), 0}, MsgSeqIds), true = if MnesiaDelete -> [{Q, ReadSeqId, WriteSeqId}] = ets:lookup(Sequences, Q), if MaxSeqId > ReadSeqId -> @@ -371,6 +365,9 @@ internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1}. internal_tx_cancel(MsgIds, State) -> + % we don't need seq ids because we're not touching mnesia, because seqids were + % never assigned + MsgSeqIds = lists:zip(MsgIds, lists:duplicate(length(MsgIds), undefined)), remove_messages(undefined, MsgIds, false, State). %% ---- ROLLING OVER THE APPEND FILE ---- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1e66fe9a81..3eab352d22 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -705,8 +705,8 @@ rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> fun() -> [ok = rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end ]]), {Deliver, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [begin [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(Q), ok end || N <- List], - rabbit_disk_queue:ack(Q, List), + [[fun() -> [begin SeqIds = [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(Q), SeqId end || N <- List], + rabbit_disk_queue:ack(Q, SeqIds), ok = rabbit_disk_queue:tx_commit(Q, []) end || Q <- Qs] end]]), @@ -738,8 +738,8 @@ rdq_stress_gc(MsgCount) -> end end, [], lists:flatten([lists:seq(N,MsgCount,N) || N <- lists:seq(StartChunk,MsgCount)]))) ++ lists:seq(1, (StartChunk - 1)), - [begin {N, Msg, MsgSizeBytes, false} = rabbit_disk_queue:deliver(q), - rabbit_disk_queue:ack(q, [N]), + [begin {N, Msg, MsgSizeBytes, false, SeqId} = rabbit_disk_queue:deliver(q), + rabbit_disk_queue:ack(q, [SeqId]), rabbit_disk_queue:tx_commit(q, []) end || N <- AckList], rdq_stop(). |
