summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_disk_queue.erl171
-rw-r--r--src/rabbit_misc.erl1
-rw-r--r--src/rabbit_mixed_queue.erl11
4 files changed, 111 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80051149fe..3ca88aaad6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -108,7 +108,7 @@ terminate(_Reason, State) ->
lists:foldl(fun (Txn, State1) ->
rollback_transaction(Txn, State1)
end, State, all_tx()),
- rabbit_mixed_queue:purge(NewState #q.mixed_state),
+ rabbit_mixed_queue:delete_queue(NewState #q.mixed_state),
ok = rabbit_amqqueue:internal_delete(QName).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index c7ef117717..b73e456c22 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -40,7 +40,7 @@
-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
- requeue/2, requeue_with_seqs/2, purge/1]).
+ requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1]).
-export([length/1, is_empty/1]).
@@ -224,11 +224,12 @@
-ifdef(use_specs).
-type(seq_id() :: non_neg_integer()).
+-type(seq_id_or_next() :: { seq_id() | 'next' }).
-spec(start_link/1 :: (non_neg_integer()) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(publish/3 :: (queue_name(), msg_id(), binary()) -> 'ok').
--spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id(), binary()) -> 'ok').
+-spec(publish_with_seq/4 :: (queue_name(), msg_id(), seq_id_or_next(), binary()) -> 'ok').
-spec(deliver/1 :: (queue_name()) ->
{'empty' | {msg_id(), binary(), non_neg_integer(),
bool(), {msg_id(), seq_id()}, non_neg_integer()}}).
@@ -237,11 +238,11 @@
-spec(ack/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_publish/2 :: (msg_id(), binary()) -> 'ok').
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [{msg_id(), seq_id()}]) -> 'ok').
--spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id()}],
+-spec(tx_commit_with_seqs/3 :: (queue_name(), [{msg_id(), seq_id_or_next()}],
[{msg_id(), seq_id()}]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [{msg_id(), seq_id()}]) -> 'ok').
--spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id()}]) -> 'ok').
+-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -295,6 +296,9 @@ requeue_with_seqs(Q, MsgSeqSeqIds) when is_list(MsgSeqSeqIds) ->
purge(Q) ->
gen_server:call(?SERVER, {purge, Q}).
+delete_queue(Q) ->
+ gen_server:cast(?SERVER, {delete_queue, Q}).
+
stop() ->
gen_server:call(?SERVER, stop, infinity).
@@ -457,6 +461,9 @@ handle_cast({requeue, Q, MsgSeqIds}, State) ->
{noreply, State1};
handle_cast({requeue_with_seqs, Q, MsgSeqSeqIds}, State) ->
{ok, State1} = internal_requeue(Q, MsgSeqSeqIds, State),
+ {noreply, State1};
+handle_cast({delete_queue, Q}, State) ->
+ {ok, State1} = internal_delete_queue(Q, State),
{noreply, State1}.
handle_info(_Info, State) ->
@@ -533,6 +540,69 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mo
Obj) ->
ets:match_object(MsgLocationEts, Obj).
+find_next_seq_id(CurrentSeq, next) ->
+ CurrentSeq + 1;
+find_next_seq_id(CurrentSeq, NextSeqId)
+ when NextSeqId > CurrentSeq ->
+ NextSeqId.
+
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) ->
+ CurrentReadWrite;
+determine_next_read_id(CurrentRead, _CurrentWrite, next) ->
+ CurrentRead;
+determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite)
+ when NextWrite > CurrentReadWrite ->
+ NextWrite;
+determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
+ when NextWrite >= CurrentWrite ->
+ CurrentRead.
+
+get_read_handle(File, State =
+ #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
+ read_file_handles_limit = ReadFileHandlesLimit }) ->
+ Now = now(),
+ {FileHdl, ReadHdls1, ReadHdlsAge1} =
+ case dict:find(File, ReadHdls) of
+ error ->
+ {ok, Hdl} = file:open(form_filename(File),
+ [read, raw, binary,
+ read_ahead]),
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, ReadHdls, ReadHdlsAge};
+ _False ->
+ {Then, OldFile, ReadHdlsAge2} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
+ end;
+ {ok, {Hdl, Then}} ->
+ {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
+ end,
+ ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
+ {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}.
+
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) ->
+ SuppliedSeqId;
+adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) ->
+ ExpectedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
+ ok = mnesia:dirty_write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
+ SuppliedSeqId;
+adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId ->
+ [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock),
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { next_seq_id = SuppliedSeqId },
+ Lock),
+ SuppliedSeqId.
+
%% ---- INTERNAL RAW FUNCTIONS ----
internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
@@ -564,42 +634,15 @@ internal_deliver(Q, ReadMsg, State = #dqstate { sequences = Sequences }) ->
end
end.
-get_read_handle(File, State =
- #dqstate { read_file_handles = {ReadHdls, ReadHdlsAge},
- read_file_handles_limit = ReadFileHandlesLimit }) ->
- Now = now(),
- {FileHdl, ReadHdls1, ReadHdlsAge1} =
- case dict:find(File, ReadHdls) of
- error ->
- {ok, Hdl} = file:open(form_filename(File),
- [read, raw, binary,
- read_ahead]),
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {Hdl, ReadHdls, ReadHdlsAge};
- _False ->
- {Then, OldFile, ReadHdlsAge2} =
- gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, Then}} =
- dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- {Hdl, dict:erase(OldFile, ReadHdls), ReadHdlsAge2}
- end;
- {ok, {Hdl, Then}} ->
- {Hdl, ReadHdls, gb_trees:delete(Then, ReadHdlsAge)}
- end,
- ReadHdls3 = dict:store(File, {FileHdl, Now}, ReadHdls1),
- ReadHdlsAge3 = gb_trees:enter(Now, File, ReadHdlsAge1),
- {FileHdl, State #dqstate {read_file_handles = {ReadHdls3, ReadHdlsAge3}}}.
-
internal_ack(Q, MsgSeqIds, State) ->
remove_messages(Q, MsgSeqIds, true, State).
%% Q is only needed if MnesiaDelete /= false
-%% called from tx_cancel with MnesiaDelete = false
-%% called from internal_tx_cancel with MnesiaDelete = txn
%% called from ack with MnesiaDelete = true
+%% called from tx_commit with MnesiaDelete = txn
+%% called from tx_cancel with MnesiaDelete = false
%% called from purge with MnesiaDelete = txn
+%% called from delete_queue with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
@@ -671,24 +714,6 @@ internal_tx_publish(MsgId, MsgBody,
{ok, State}
end.
-adjust_last_msg_seq_id(_Q, ExpectedSeqId, next, _Mode) ->
- ExpectedSeqId;
-adjust_last_msg_seq_id(_Q, 0, SuppliedSeqId, _Mode) ->
- SuppliedSeqId;
-adjust_last_msg_seq_id(_Q, ExpectedSeqId, ExpectedSeqId, _Mode) ->
- ExpectedSeqId;
-adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, dirty) when SuppliedSeqId > ExpectedSeqId ->
- [Obj] = mnesia:dirty_read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}),
- ok = mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc { next_seq_id = SuppliedSeqId }),
- SuppliedSeqId;
-adjust_last_msg_seq_id(Q, ExpectedSeqId, SuppliedSeqId, Lock) when SuppliedSeqId > ExpectedSeqId ->
- [Obj] = mnesia:read(rabbit_disk_queue, {Q, ExpectedSeqId - 1}, Lock),
- ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { next_seq_id = SuppliedSeqId },
- Lock),
- SuppliedSeqId.
-
%% can call this with PubMsgSeqIds as zip(PubMsgIds, duplicate(N, next))
internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
State = #dqstate { current_file_handle = CurHdl,
@@ -724,10 +749,7 @@ internal_tx_commit(Q, PubMsgSeqIds, AckSeqIds,
[{MsgId, _RefCount, File, _Offset, _TotalSize}] =
dets_ets_lookup(State, MsgId),
SeqId2 = adjust_last_msg_seq_id(Q, ExpectedSeqId, SeqId, write),
- NextSeqId2 = if NextSeqId =:= next -> SeqId2 + 1;
- true -> NextSeqId
- end,
- true = NextSeqId2 > SeqId2,
+ NextSeqId2 = find_next_seq_id(SeqId2, NextSeqId),
ok = mnesia:write(rabbit_disk_queue,
#dq_msg_loc { queue_and_seq_id =
{Q, SeqId2},
@@ -778,17 +800,6 @@ internal_tx_cancel(MsgIds, State) ->
MsgSeqIds = lists:zip(MsgIds, lists:duplicate(erlang:length(MsgIds), undefined)),
remove_messages(undefined, MsgSeqIds, false, State).
-determine_next_read_id(CurrentReadWrite, CurrentReadWrite, CurrentReadWrite) ->
- CurrentReadWrite;
-determine_next_read_id(CurrentRead, _CurrentWrite, next) ->
- CurrentRead;
-determine_next_read_id(CurrentReadWrite, CurrentReadWrite, NextWrite)
- when NextWrite > CurrentReadWrite ->
- NextWrite;
-determine_next_read_id(CurrentRead, CurrentWrite, NextWrite)
- when NextWrite >= CurrentWrite ->
- CurrentRead.
-
internal_requeue(_Q, [], State) ->
{ok, State};
internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
@@ -828,10 +839,7 @@ internal_requeue(Q, MsgSeqIds = [{_, FirstSeqIdTo}|MsgSeqIdsTail],
{_NextMsgSeqId, NextSeqIdTo}},
ExpectedSeqIdTo) ->
SeqIdTo2 = adjust_last_msg_seq_id(Q, ExpectedSeqIdTo, SeqIdTo, write),
- NextSeqIdTo2 = if NextSeqIdTo =:= next -> SeqIdTo2 + 1;
- true -> NextSeqIdTo
- end,
- true = NextSeqIdTo2 > SeqIdTo2,
+ NextSeqIdTo2 = find_next_seq_id(SeqIdTo2, NextSeqIdTo),
[Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
mnesia:read(rabbit_disk_queue, {Q, SeqIdOrig}, write),
mnesia:write(rabbit_disk_queue,
@@ -869,6 +877,27 @@ internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
{ok, WriteSeqId - ReadSeqId, State2}
end.
+internal_delete_queue(Q, State = #dqstate { sequences = Sequences }) ->
+ true = ets:delete(Sequences, Q),
+ {atomic, {ok, State1}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ Objs =
+ mnesia:match_object(rabbit_disk_queue, #dq_msg_loc { queue_and_seq_id = {Q, '_'},
+ msg_id = '_',
+ is_delivered = '_',
+ next_seq_id = '_'
+ }, write),
+ MsgSeqIds =
+ lists:map(
+ fun (#dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, msg_id = MsgId }) ->
+ {MsgId, SeqId}
+ end, Objs),
+ remove_messages(Q, MsgSeqIds, txn, State)
+ end),
+ {ok, State1}.
+
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 153a8a7cb8..5b021b36cc 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -110,6 +110,7 @@
-spec(format_stderr/2 :: (string(), [any()]) -> 'true').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false'))) -> A -> [B])
-endif.
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 5cda8eca90..b807fce20f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -36,8 +36,8 @@
-export([start_link/2]).
-export([publish/2, publish_delivered/2, deliver/1, ack/2,
- tx_publish/2, tx_commit/3, tx_cancel/2,
- requeue/2, purge/1, length/1, is_empty/1]).
+ tx_publish/2, tx_commit/3, tx_cancel/2, requeue/2, purge/1,
+ length/1, is_empty/1, delete_queue/1]).
-record(mqstate, { mode,
msg_buf,
@@ -206,6 +206,13 @@ purge(State = #mqstate { queue = Q, msg_buf = MsgBuf, mode = mixed }) ->
Count = queue:len(MsgBuf),
{Count, State #mqstate { msg_buf = queue:new() }}.
+delete_queue(State = #mqstate { queue = Q, mode = disk }) ->
+ rabbit_disk_queue:delete_queue(Q),
+ {ok, State};
+delete_queue(State = #mqstate { queue = Q, mode = mixed }) ->
+ rabbit_disk_queue:delete_queue(Q),
+ {ok, State #mqstate { msg_buf = queue:new() }}.
+
length(#mqstate { queue = Q, mode = disk }) ->
rabbit_disk_queue:length(Q);
length(#mqstate { mode = mixed, msg_buf = MsgBuf }) ->