diff options
| -rw-r--r-- | src/rabbit.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 |
5 files changed, 89 insertions, 50 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index e79c7f5936..ce73f6ce6d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -145,17 +145,21 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) end}, - {"recovery", - fun () -> - ok = maybe_insert_default_data(), - ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover() - end}, {"disk queue", fun () -> ok = start_child(rabbit_disk_queue), ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME end}, + {"recovery", + fun () -> + ok = maybe_insert_default_data(), + ok = rabbit_exchange:recover(), + {ok, DurableQueues} = rabbit_amqqueue:recover(), + DurableQueueNames = + sets:from_list(lists:map( + fun(Q) -> Q #amqqueue.name end, DurableQueues)), + ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames) + end}, {"guid generator", fun () -> ok = start_child(rabbit_guid) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0316788fe1..c56e51888b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -119,37 +119,39 @@ start() -> ok. recover() -> - ok = recover_durable_queues(), - ok. + {ok, DurableQueues} = recover_durable_queues(), + {ok, DurableQueues}. recover_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) - end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + DurableQueues = + lists:foldl( + fun (RecoveredQ, Acc) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> [Q|Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc + end + end, [], + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end)), + {ok, DurableQueues}. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2ffb9a754d..b7eca499d6 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -41,7 +41,7 @@ -export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2, tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1, requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1, - dump_queue/1 + dump_queue/1, delete_non_durable_queues/1 ]). -export([length/1, is_empty/1]). @@ -248,7 +248,8 @@ -spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok'). -spec(purge/1 :: (queue_name()) -> non_neg_integer()). -spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(), - bool(), {msg_id(), seq_id()}}]). + bool(), seq_id()}]). +-spec(delete_non_durable_queues/1 :: (set()) -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_obliterate/0 :: () -> 'ok'). -spec(to_ram_disk_mode/0 :: () -> 'ok'). @@ -307,6 +308,9 @@ delete_queue(Q) -> dump_queue(Q) -> gen_server2:call(?SERVER, {dump_queue, Q}, infinity). +delete_non_durable_queues(DurableQueues) -> + gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity). + stop() -> gen_server2:call(?SERVER, stop, infinity). @@ -346,7 +350,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> E -> E end, ok = filelib:ensure_dir(form_filename("nothing")), - InitName = "0" ++ ?FILE_EXTENSION, + file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ + ?FILE_EXTENSION_DETS)), {ok, MsgLocationDets} = dets:open_file(?MSG_LOC_NAME, [{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++ @@ -360,6 +365,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% it would be better to have this as private, but dets:from_ets/2 %% seems to blow up if it is set private MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]), + + InitName = "0" ++ ?FILE_EXTENSION, State = #dqstate { msg_location_dets = MsgLocationDets, msg_location_ets = MsgLocationEts, @@ -449,7 +456,10 @@ handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) -> end; handle_call({dump_queue, Q}, _From, State) -> {Result, State1} = internal_dump_queue(Q, State), - {reply, Result, State1}. + {reply, Result, State1}; +handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> + {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), + {reply, ok, State1}. handle_cast({publish, Q, MsgId, MsgBody}, State) -> {ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State), @@ -928,13 +938,27 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) -> fun ({SeqId, _State1}) when SeqId == WriteSeq -> false; ({SeqId, State1}) -> - {ok, Result, NextReadSeqId, State2} = + {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, NextReadSeqId, State2} = internal_read_message(Q, SeqId, true, true, State1), - {true, Result, {NextReadSeqId, State2}} + {true, {MsgId, Msg, Size, Delivered, SeqId}, {NextReadSeqId, State2}} end, {ReadSeq, State}), {lists:reverse(QList), State3} end. +internal_delete_non_durable_queues(DurableQueues, State = #dqstate { sequences = Sequences }) -> + State3 = + ets:foldl( + fun ({Q, _Read, _Write, _Length}, State1) -> + case sets:is_element(Q, DurableQueues) of + true -> + State1; + false -> + {ok, State2} = internal_delete_queue(Q, State1), + State2 + end + end, State, Sequences), + {ok, State3}. + %% ---- ROLLING OVER THE APPEND FILE ---- maybe_roll_to_new_file(Offset, @@ -1064,10 +1088,10 @@ combine_files({Source, SourceValid, _SourceContiguousTop, State = close_file(Source, close_file(Destination, State1)), {ok, SourceHdl} = file:open(form_filename(Source), - [read, write, raw, binary, delayed_write, read_ahead]), + [read, write, raw, binary, read_ahead, delayed_write]), {ok, DestinationHdl} = file:open(form_filename(Destination), - [read, write, raw, binary, delayed_write, read_ahead]), + [read, write, raw, binary, read_ahead, delayed_write]), ExpectedSize = SourceValid + DestinationValid, %% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file %% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file @@ -1080,7 +1104,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop, Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP, {ok, TmpHdl} = file:open(form_filename(Tmp), - [read, write, raw, binary, delayed_write, read_ahead]), + [read, write, raw, binary, read_ahead, delayed_write]), Worklist = lists:dropwhile( fun ({_, _, _, Offset, _}) @@ -1262,9 +1286,11 @@ load_from_disk(State) -> {atomic, true} = mnesia:transaction( fun() -> ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) -> - true = 1 =:= - erlang:length(dets_ets_lookup(State1, MsgId)) + mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }, true) -> + case erlang:length(dets_ets_lookup(State1, MsgId)) of + 0 -> ok == mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write); + 1 -> true + end end, true, rabbit_disk_queue) end), diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl index d1000c887e..8455bf1c9d 100644 --- a/src/rabbit_mixed_queue.erl +++ b/src/rabbit_mixed_queue.erl @@ -48,7 +48,14 @@ ). start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed -> - {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1, + QList = rabbit_disk_queue:dump_queue(Queue), + {MsgBuf, NextSeq} = + lists:foldl( + fun ({MsgId, Msg, Size, Delivered, SeqId}, {Buf, NSeq}) + when SeqId >= NSeq -> + {queue:in({SeqId, Msg, Delivered}, Buf), SeqId + 1} + end, {queue:new(), 0}, QList), + {ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq, queue = Queue, is_durable = IsDurable }}. msg_to_bin(Msg = #basic_message { content = Content }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 75b36d6f3d..70fc45e099 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -908,7 +908,7 @@ rdq_test_dump_queue() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- All], rabbit_disk_queue:tx_commit(q, All, []), io:format("Publish done~n", []), - QList = [{N, Msg, 256, false, {N, (N-1)}} || N <- All], + QList = [{N, Msg, 256, false, (N-1)} || N <- All], QList = rabbit_disk_queue:dump_queue(q), rdq_stop(), io:format("dump ok undelivered~n", []), @@ -916,13 +916,13 @@ rdq_test_dump_queue() -> lists:foreach( fun (N) -> Remaining = Total - N, - {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q) + {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q) end, All), [] = rabbit_disk_queue:dump_queue(q), rdq_stop(), io:format("dump ok post delivery~n", []), rdq_start(), - QList2 = [{N, Msg, 256, true, {N, (N-1)}} || N <- All], + QList2 = [{N, Msg, 256, true, (N-1)} || N <- All], QList2 = rabbit_disk_queue:dump_queue(q), io:format("dump ok post delivery + restart~n", []), passed. |
