summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_db_queue.erl30
-rw-r--r--src/rabbit_disk_queue.erl39
-rw-r--r--src/rabbit_tests.erl23
3 files changed, 86 insertions, 6 deletions
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl
index 97f1b9869e..897a4a6fac 100644
--- a/src/rabbit_db_queue.erl
+++ b/src/rabbit_db_queue.erl
@@ -59,7 +59,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2,
+ tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
-export([stop/0, stop_and_obliterate/0]).
@@ -86,6 +87,7 @@
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
+-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
@@ -121,6 +123,9 @@ tx_cancel(MsgIds) when is_list(MsgIds) ->
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+purge(Q) ->
+ gen_server:call(?SERVER, {purge, Q}).
+
stop() ->
gen_server:call(?SERVER, stop, infinity).
@@ -148,6 +153,9 @@ handle_call({phantom_deliver, Q}, _From, State) ->
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
{reply, ok, State1};
+handle_call({purge, Q}, _From, State) ->
+ {ok, Count, State1} = internal_purge(Q, State),
+ {reply, Count, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State = #dbstate { db_conn = Conn }) ->
@@ -424,3 +432,23 @@ shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) ->
0
end,
shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc).
+
+internal_purge(Q, State = #dbstate { db_conn = Conn }) ->
+ QStr = binary_to_escaped_string(term_to_binary(Q)),
+ case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of
+ {selected, _, []} ->
+ odbc:commit(Conn, commit),
+ {ok, 0, State};
+ {selected, _, [{ReadSeqId}]} ->
+ odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr),
+ {selected, _, MsgSeqIds} =
+ odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++
+ QStr ++ " and seq_id >= " ++ ReadSeqId),
+ MsgSeqIds2 = lists:map(
+ fun ({MsgIdStr, SeqIdStr}) ->
+ { binary_to_term(hex_string_to_binary(MsgIdStr)),
+ list_to_integer(SeqIdStr) }
+ end, MsgSeqIds),
+ {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State),
+ {ok, length(MsgSeqIds2), State2}
+ end.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 0c4c2e2a61..5aae2298e8 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -38,7 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]).
+-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2,
+ tx_commit/3, tx_cancel/1, requeue/2, purge/1]).
-export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]).
@@ -233,6 +234,7 @@
-spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok').
-spec(tx_cancel/1 :: ([msg_id()]) -> 'ok').
-spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok').
+-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
@@ -270,6 +272,9 @@ tx_cancel(MsgIds) when is_list(MsgIds) ->
requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}).
+purge(Q) ->
+ gen_server:call(?SERVER, {purge, Q}).
+
stop() ->
gen_server:call(?SERVER, stop, infinity).
@@ -356,6 +361,9 @@ handle_call({phantom_deliver, Q}, _From, State) ->
handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) ->
{ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State),
{reply, ok, State1};
+handle_call({purge, Q}, _From, State) ->
+ {ok, Count, State1} = internal_purge(Q, State),
+ {reply, Count, State1};
handle_call(stop, _From, State) ->
{stop, normal, ok, State}; %% gen_server now calls terminate
handle_call(stop_vaporise, _From, State) ->
@@ -544,6 +552,7 @@ internal_ack(Q, MsgSeqIds, State) ->
%% called from tx_cancel with MnesiaDelete = false
%% called from internal_tx_cancel with MnesiaDelete = txn
%% called from ack with MnesiaDelete = true
+%% called from purge with MnesiaDelete = txn
remove_messages(Q, MsgSeqIds, MnesiaDelete,
State = #dqstate { file_summary = FileSummary,
current_file_name = CurName
@@ -625,7 +634,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds,
[] -> {0,0};
[{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2}
end,
- {atomic, {Sync, WriteSeqId}} =
+ {atomic, {Sync, WriteSeqId, State2}} =
mnesia:transaction(
fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue),
%% must deal with publishes first, if we didn't
@@ -646,14 +655,14 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds,
write),
{Acc or (CurName =:= File), NextWriteSeqId + 1}
end, {false, InitWriteSeqId}, PubMsgIds),
- remove_messages(Q, AckSeqIds, txn, State),
- {Sync2, WriteSeqId3}
+ {ok, State3} = remove_messages(Q, AckSeqIds, txn, State),
+ {Sync2, WriteSeqId3, State3}
end),
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}),
if Sync -> ok = file:sync(CurHdl);
true -> ok
end,
- {ok, State}.
+ {ok, State2}.
internal_publish(Q, MsgId, MsgBody, State) ->
{ok, State1 = #dqstate { sequences = Sequences }} =
@@ -722,6 +731,26 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}),
{ok, State}.
+internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
+ case ets:lookup(Sequences, Q) of
+ [] -> {ok, 0, State};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ {atomic, {ok, State2}} =
+ mnesia:transaction(
+ fun() ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ MsgSeqIds = lists:foldl(
+ fun (SeqId, Acc) ->
+ [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] =
+ mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
+ [{MsgId, SeqId} | Acc]
+ end, [], lists:seq(ReadSeqId, WriteSeqId - 1)),
+ remove_messages(Q, MsgSeqIds, txn, State)
+ end),
+ true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
+ {ok, WriteSeqId - ReadSeqId, State2}
+ end.
+
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5924bb38fe..14461abb13 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -695,6 +695,7 @@ test_disk_queue() ->
passed = rdq_stress_gc(10000),
passed = rdq_test_startup_with_queue_gaps(),
passed = rdq_test_redeliver(),
+ passed = rdq_test_purge(),
passed.
rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
@@ -863,6 +864,28 @@ rdq_test_redeliver() ->
rdq_stop(),
passed.
+rdq_test_purge() ->
+ rdq_virgin(),
+ rdq_start(),
+ Msg = <<0:(8*256)>>,
+ Total = 1000,
+ Half = round(Total/2),
+ All = lists:seq(1,Total),
+ [rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
+ rabbit_disk_queue:tx_commit(q, All, []),
+ io:format("Publish done~n", []),
+ %% deliver first half
+ Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end
+ || N <- lists:seq(1,Half)],
+ io:format("Deliver first half done~n", []),
+ rabbit_disk_queue:purge(q),
+ io:format("Purge done~n", []),
+ rabbit_disk_queue:tx_commit(q, [], Seqs),
+ io:format("Ack first half done~n", []),
+ empty = rabbit_disk_queue:deliver(q),
+ rdq_stop(),
+ passed.
+
rdq_time_commands(Funcs) ->
lists:foreach(fun (F) -> F() end, Funcs).