diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-19 13:21:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-19 13:21:30 +0100 |
| commit | b81f6ccf7329f5c3944bc8e63b4cfc3135db5112 (patch) | |
| tree | 5807542123359c5e51ef289a9b6cad71cf76a9cf | |
| parent | 8549f92b6928f7772714a9bde1b5569e418522cf (diff) | |
| download | rabbitmq-server-git-b81f6ccf7329f5c3944bc8e63b4cfc3135db5112.tar.gz | |
Added support for purging queues. This is done per the 0-9-1 spec so we purge everything that hasn't been delivered.
| -rw-r--r-- | src/rabbit_db_queue.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 23 |
3 files changed, 86 insertions, 6 deletions
diff --git a/src/rabbit_db_queue.erl b/src/rabbit_db_queue.erl index 97f1b9869e..897a4a6fac 100644 --- a/src/rabbit_db_queue.erl +++ b/src/rabbit_db_queue.erl @@ -59,7 +59,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, + tx_commit/3, tx_cancel/1, requeue/2, purge/1]). -export([stop/0, stop_and_obliterate/0]). @@ -86,6 +87,7 @@ -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). +-spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). @@ -121,6 +123,9 @@ tx_cancel(MsgIds) when is_list(MsgIds) -> requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). +purge(Q) -> + gen_server:call(?SERVER, {purge, Q}). + stop() -> gen_server:call(?SERVER, stop, infinity). @@ -148,6 +153,9 @@ handle_call({phantom_deliver, Q}, _From, State) -> handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), {reply, ok, State1}; +handle_call({purge, Q}, _From, State) -> + {ok, Count, State1} = internal_purge(Q, State), + {reply, Count, State1}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State = #dbstate { db_conn = Conn }) -> @@ -424,3 +432,23 @@ shuffle_up(Conn, QStr, BaseSeqId, SeqId, Gap) -> 0 end, shuffle_up(Conn, QStr, BaseSeqId, SeqId - 1, Gap + GapInc). + +internal_purge(Q, State = #dbstate { db_conn = Conn }) -> + QStr = binary_to_escaped_string(term_to_binary(Q)), + case odbc:sql_query(Conn, "select next_read from sequence where queue = " ++ QStr) of + {selected, _, []} -> + odbc:commit(Conn, commit), + {ok, 0, State}; + {selected, _, [{ReadSeqId}]} -> + odbc:sql_query(Conn, "update sequence set next_read = next_write where queue = " ++ QStr), + {selected, _, MsgSeqIds} = + odbc:sql_query(Conn, "select msg_id, seq_id from ledger where queue = " ++ + QStr ++ " and seq_id >= " ++ ReadSeqId), + MsgSeqIds2 = lists:map( + fun ({MsgIdStr, SeqIdStr}) -> + { binary_to_term(hex_string_to_binary(MsgIdStr)), + list_to_integer(SeqIdStr) } + end, MsgSeqIds), + {ok, State2} = remove_messages(Q, MsgSeqIds2, true, State), + {ok, length(MsgSeqIds2), State2} + end. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 0c4c2e2a61..5aae2298e8 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -38,7 +38,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_cancel/1, requeue/2]). +-export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, + tx_commit/3, tx_cancel/1, requeue/2, purge/1]). -export([stop/0, stop_and_obliterate/0, to_disk_only_mode/0, to_ram_disk_mode/0]). @@ -233,6 +234,7 @@ -spec(tx_commit/3 :: (queue_name(), [msg_id()], [seq_id()]) -> 'ok'). -spec(tx_cancel/1 :: ([msg_id()]) -> 'ok'). -spec(requeue/2 :: (queue_name(), [seq_id()]) -> 'ok'). +-spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -270,6 +272,9 @@ tx_cancel(MsgIds) when is_list(MsgIds) -> requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> gen_server:cast(?SERVER, {requeue, Q, MsgSeqIds}). +purge(Q) -> + gen_server:call(?SERVER, {purge, Q}). + stop() -> gen_server:call(?SERVER, stop, infinity). @@ -356,6 +361,9 @@ handle_call({phantom_deliver, Q}, _From, State) -> handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, _From, State) -> {ok, State1} = internal_tx_commit(Q, PubMsgIds, AckSeqIds, State), {reply, ok, State1}; +handle_call({purge, Q}, _From, State) -> + {ok, Count, State1} = internal_purge(Q, State), + {reply, Count, State1}; handle_call(stop, _From, State) -> {stop, normal, ok, State}; %% gen_server now calls terminate handle_call(stop_vaporise, _From, State) -> @@ -544,6 +552,7 @@ internal_ack(Q, MsgSeqIds, State) -> %% called from tx_cancel with MnesiaDelete = false %% called from internal_tx_cancel with MnesiaDelete = txn %% called from ack with MnesiaDelete = true +%% called from purge with MnesiaDelete = txn remove_messages(Q, MsgSeqIds, MnesiaDelete, State = #dqstate { file_summary = FileSummary, current_file_name = CurName @@ -625,7 +634,7 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, [] -> {0,0}; [{Q, ReadSeqId2, WriteSeqId2}] -> {ReadSeqId2, WriteSeqId2} end, - {atomic, {Sync, WriteSeqId}} = + {atomic, {Sync, WriteSeqId, State2}} = mnesia:transaction( fun() -> ok = mnesia:write_lock_table(rabbit_disk_queue), %% must deal with publishes first, if we didn't @@ -646,14 +655,14 @@ internal_tx_commit(Q, PubMsgIds, AckSeqIds, write), {Acc or (CurName =:= File), NextWriteSeqId + 1} end, {false, InitWriteSeqId}, PubMsgIds), - remove_messages(Q, AckSeqIds, txn, State), - {Sync2, WriteSeqId3} + {ok, State3} = remove_messages(Q, AckSeqIds, txn, State), + {Sync2, WriteSeqId3, State3} end), true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId}), if Sync -> ok = file:sync(CurHdl); true -> ok end, - {ok, State}. + {ok, State2}. internal_publish(Q, MsgId, MsgBody, State) -> {ok, State1 = #dqstate { sequences = Sequences }} = @@ -722,6 +731,26 @@ internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId2}), {ok, State}. +internal_purge(Q, State = #dqstate { sequences = Sequences }) -> + case ets:lookup(Sequences, Q) of + [] -> {ok, 0, State}; + [{Q, ReadSeqId, WriteSeqId}] -> + {atomic, {ok, State2}} = + mnesia:transaction( + fun() -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + MsgSeqIds = lists:foldl( + fun (SeqId, Acc) -> + [#dq_msg_loc { is_delivered = false, msg_id = MsgId }] = + mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), + [{MsgId, SeqId} | Acc] + end, [], lists:seq(ReadSeqId, WriteSeqId - 1)), + remove_messages(Q, MsgSeqIds, txn, State) + end), + true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), + {ok, WriteSeqId - ReadSeqId, State2} + end. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5924bb38fe..14461abb13 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -695,6 +695,7 @@ test_disk_queue() -> passed = rdq_stress_gc(10000), passed = rdq_test_startup_with_queue_gaps(), passed = rdq_test_redeliver(), + passed = rdq_test_purge(), passed. rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> @@ -863,6 +864,28 @@ rdq_test_redeliver() -> rdq_stop(), passed. +rdq_test_purge() -> + rdq_virgin(), + rdq_start(), + Msg = <<0:(8*256)>>, + Total = 1000, + Half = round(Total/2), + All = lists:seq(1,Total), + [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], + rabbit_disk_queue:tx_commit(q, All, []), + io:format("Publish done~n", []), + %% deliver first half + Seqs = [begin {N, Msg, 256, false, SeqId} = rabbit_disk_queue:deliver(q), SeqId end + || N <- lists:seq(1,Half)], + io:format("Deliver first half done~n", []), + rabbit_disk_queue:purge(q), + io:format("Purge done~n", []), + rabbit_disk_queue:tx_commit(q, [], Seqs), + io:format("Ack first half done~n", []), + empty = rabbit_disk_queue:deliver(q), + rdq_stop(), + passed. + rdq_time_commands(Funcs) -> lists:foreach(fun (F) -> F() end, Funcs). |
