diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 171 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 11 |
4 files changed, 111 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80051149fe..3ca88aaad6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -108,7 +108,7 @@ terminate(_Reason, State) -> lists:foldl(fun (Txn, State1) -> rollback_transaction(Txn, State1) end, State, all_tx()), - rabbit_mixed_queue:purge(NewState #q.mixed_state), + rabbit_mixed_queue:delete_queue(NewState #q.mixed_state), ok = rabbit_amqqueue:internal_delete(QName). code_change(_OldVsn, State, _Extra) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index c7ef117717..b73e456c22 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -40,7 +40,7 @@ -export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, - requeue/2, requeue_with_seqs/2, purge/1]). + requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1]). -export([length/1, is_empty/1]). @@ -224,11 +224,12 @@ -ifdef(use_specs). -type(seq_id() :: non_neg_integer()). +-type(seq_id_or_next() :: { seq_id() | 'next' }). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok'). --spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok'). +-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id_or_next(), binary()) -> 'ok'). -spec(deliver/1 :: (queue_name()) -> {'empty' | {msg_id(), binary(), non_neg_integer(), bool(), {msg_id(), seq_id()}, non_neg_integer()}}). @@ -237,11 +238,11 @@ -spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok'). -spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok'). --spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}], +-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}], [{msg_id(), seq_id()}]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok'). --spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok'). +-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -295,6 +296,9 @@ requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) -> purge(Q) -> gen_server:call(?SERVER, {purge, Q}). +delete_queue(Q) -> + gen_server:cast(?SERVER, {delete_queue, Q}). + stop() -> gen_server:call(?SERVER, stop, infinity). @@ -457,6 +461,9 @@ handle_cast({requeue, Q, MsgSeqIds}, State) -> {noreply, State1}; handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) -> {ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State), + {noreply, State1}; +handle_cast({delete_queue, Q}, State) -> + {ok, State1} = internal_delete_queue(Q, State), {noreply, State1}. handle_info(_Info, State) -> @@ -533,6 +540,69 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo Obj) -> ets:match_object(MsgLocationEts, Obj). +find_next_seq_id(CurrentSeq, next) -> + CurrentSeq + 1; +find_next_seq_id(CurrentSeq, NextSeqId) + when NextSeqId > CurrentSeq -> + NextSeqId. + +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) -> + CurrentReadWrite; +determine_next_read_id(CurrentRead, _CurrentWrite, next) -> + CurrentRead; +determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite) + when NextWrite > CurrentReadWrite -> + NextWrite; +determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) + when NextWrite >= CurrentWrite -> + CurrentRead. + +get_read_handle(File, State = + #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, + read_file_handles_limit = ReadFileHandlesLimit }) -> + Now = now(), + {FileHdl, ReadHdls1, ReadHdlsAge1} = + case dict:find(File, ReadHdls) of + error -> + {ok, Hdl} = file:open(form_filename(File), + [read, raw, binary, + read_ahead]), + case dict:size(ReadHdls) < ReadFileHandlesLimit of + true -> + {Hdl, ReadHdls, ReadHdlsAge}; + _False -> + {Then, OldFile, ReadHdlsAge2} = + gb_trees:take_smallest(ReadHdlsAge), + {ok, {OldHdl, Then}} = + dict:find(OldFile, ReadHdls), + ok = file:close(OldHdl), + {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} + end; + {ok, {Hdl, Then}} -> + {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} + end, + ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), + ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), + {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}. + +adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> + ExpectedSeqId; +adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) -> + SuppliedSeqId; +adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) -> + ExpectedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), + ok = mnesia:dirty_write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), + SuppliedSeqId; +adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId -> + [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock), + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }, + Lock), + SuppliedSeqId. + %% ---- INTERNAL RAW FUNCTIONS ---- internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> @@ -564,42 +634,15 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) -> end end. -get_read_handle(File, State = - #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge}, - read_file_handles_limit = ReadFileHandlesLimit }) -> - Now = now(), - {FileHdl, ReadHdls1, ReadHdlsAge1} = - case dict:find(File, ReadHdls) of - error -> - {ok, Hdl} = file:open(form_filename(File), - [read, raw, binary, - read_ahead]), - case dict:size(ReadHdls) < ReadFileHandlesLimit of - true -> - {Hdl, ReadHdls, ReadHdlsAge}; - _False -> - {Then, OldFile, ReadHdlsAge2} = - gb_trees:take_smallest(ReadHdlsAge), - {ok, {OldHdl, Then}} = - dict:find(OldFile, ReadHdls), - ok = file:close(OldHdl), - {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2} - end; - {ok, {Hdl, Then}} -> - {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)} - end, - ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1), - ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1), - {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}. - internal_ack(Q, MsgSeqIds, State) -> remove_messages(Q, MsgSeqIds, true, State). %% Q is only needed if MnesiaDelete /= false -%% called from tx_cancel with MnesiaDelete = false -%% called from internal_tx_cancel with MnesiaDelete = txn %% called from ack with MnesiaDelete = true +%% called from tx_commit with MnesiaDelete = txn +%% called from tx_cancel with MnesiaDelete = false %% called from purge with MnesiaDelete = txn +%% called from delete_queue with MnesiaDelete = txn remove_messages(Q, MsgSeqIds, MnesiaDelete, State = #dqstate { file_summary = FileSummary, current_file_name = CurName @@ -671,24 +714,6 @@ internal_tx_publish(MsgId, MsgBody, {ok, State} end. -adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) -> - ExpectedSeqId; -adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) -> - SuppliedSeqId; -adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) -> - ExpectedSeqId; -adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId -> - [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}), - ok = mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }), - SuppliedSeqId; -adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId -> - [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock), - ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }, - Lock), - SuppliedSeqId. - %% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next)) internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, State = #dqstate { current_file_handle = CurHdl, @@ -724,10 +749,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds, [{MsgId, _RefCount, File, _Offset, _TotalSize}] = dets_ets_lookup(State, MsgId), SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write), - NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1; - true -> NextSeqId - end, - true = NextSeqId2 > SeqId2, + NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId), ok = mnesia:write(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, SeqId2}, @@ -778,17 +800,6 @@ internal_tx_cancel(MsgIds, State) -> MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)), remove_messages(undefined, MsgSeqIds, false, State). -determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) -> - CurrentReadWrite; -determine_next_read_id(CurrentRead, _CurrentWrite, next) -> - CurrentRead; -determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite) - when NextWrite > CurrentReadWrite -> - NextWrite; -determine_next_read_id(CurrentRead, CurrentWrite, NextWrite) - when NextWrite >= CurrentWrite -> - CurrentRead. - internal_requeue(_Q, [], State) -> {ok, State}; internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], @@ -828,10 +839,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail], {_NextMsgSeqId, NextSeqIdTo}}, ExpectedSeqIdTo) -> SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write), - NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1; - true -> NextSeqIdTo - end, - true = NextSeqIdTo2 > SeqIdTo2, + NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo), [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write), mnesia:write(rabbit_disk_queue, @@ -869,6 +877,27 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) -> {ok, WriteSeqId - ReadSeqId, State2} end. +internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) -> + true = ets:delete(Sequences, Q), + {atomic, {ok, State1}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + Objs = + mnesia:match_object(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'}, + msg_id = '_', + is_delivered = '_', + next_seq_id = '_' + }, write), + MsgSeqIds = + lists:map( + fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) -> + {MsgId, SeqId} + end, Objs), + remove_messages(Q, MsgSeqIds, txn, State) + end), + {ok, State1}. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 153a8a7cb8..5b021b36cc 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -110,6 +110,7 @@ -spec(format_stderr/2 :: (string(), [any()]) -> 'true'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false'))) -> A -> [B]) -endif. diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index 5cda8eca90..b807fce20f 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -36,8 +36,8 @@ -export([start_link/2]). -export([publish/2, publish_delivered/2, deliver/1, ack/2, - tx_publish/2, tx_commit/3, tx_cancel/2, - requeue/2, purge/1, length/1, is_empty/1]). + tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1, + length/1, is_empty/1, delete_queue/1]). -record(mqstate, { mode, msg_buf, @@ -206,6 +206,13 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) -> Count = queue:len(MsgBuf), {Count, State #mqstate { msg_buf = queue:new() }}. +delete_queue(State = #mqstate { queue = Q, mode = disk }) -> + rabbit_disk_queue:delete_queue(Q), + {ok, State}; +delete_queue(State = #mqstate { queue = Q, mode = mixed }) -> + rabbit_disk_queue:delete_queue(Q), + {ok, State #mqstate { msg_buf = queue:new() }}. + length(#mqstate { queue = Q, mode = disk }) -> rabbit_disk_queue:length(Q); length(#mqstate { mode = mixed, msg_buf = MsgBuf }) -> |
