diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-13 16:15:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-13 16:15:29 +0100 |
| commit | 4ef2ed434c2f3c2c8e83ff6670ad7347f390b562 (patch) | |
| tree | b74b961735af2276650849fb3ba9e9f32e3f5b31 | |
| parent | 02e5493c4de8be92e447d904b7bc41cd5475da02 (diff) | |
| download | rabbitmq-server-git-4ef2ed434c2f3c2c8e83ff6670ad7347f390b562.tar.gz | |
most of the rewiring is done. Need to sort out how to delete non durable queues on start up, which is a bit cyclical, as I'd like to not start the msg_store until we know which queues are durable and which aren't, but we also can't start the queues until the msg_store is running. Fun.
| -rw-r--r-- | src/rabbit.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 188 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 743 | ||||
| -rw-r--r-- | src/rabbit_mixed_queue.erl | 673 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 1 |
6 files changed, 73 insertions, 1538 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 4e027ca824..b859c4affa 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -150,6 +150,7 @@ start(normal, []) -> ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor), ok = start_child(rabbit_guid), + %% TODO - this should probably use start_child somehow too ok = rabbit_queue_index:start_msg_store() end}, {"recovery", diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3228655257..840c2c4d8a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,7 +42,6 @@ -export([notify_sent/2, unblock/2, tx_commit_callback/3]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --export([set_storage_mode/2]). -import(mnesia). -import(gen_server2). @@ -107,7 +106,6 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(tx_commit_callback/3 :: (pid(), [message()], [acktag()]) -> 'ok'). --spec(set_storage_mode/2 :: (pid(), ('oppressed' | 'liberated')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -228,9 +226,6 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -set_storage_mode(QPid, Mode) -> - gen_server2:pcast(QPid, 10, {set_storage_mode, Mode}). - info(#amqqueue{ pid = QPid }) -> gen_server2:pcall(QPid, 9, info, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 99fd6987de..152205edaf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,7 +38,6 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds -export([start_link/1]). @@ -54,11 +53,10 @@ owner, exclusive_consumer, has_had_consumers, - mixed_state, + variable_queue_state, next_msg_id, active_consumers, - blocked_consumers, - memory_report_timer + blocked_consumers }). -record(consumer, {tag, ack_required}). @@ -88,8 +86,7 @@ acks_uncommitted, consumers, transactions, - memory, - storage_mode + memory ]). %%---------------------------------------------------------------------------- @@ -99,43 +96,41 @@ start_link(Q) -> %%---------------------------------------------------------------------------- -init(Q = #amqqueue { name = QName, durable = Durable }) -> +init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), ok = rabbit_memory_manager:register (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), - {ok, MS} = rabbit_mixed_queue:init(QName, Durable), + VQS = rabbit_variable_queue:init(QName), State = #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - mixed_state = MS, + variable_queue_state = VQS, next_msg_id = 1, active_consumers = queue:new(), - blocked_consumers = queue:new(), - memory_report_timer = undefined + blocked_consumers = queue:new() }, - %% first thing we must do is report_memory. - {ok, start_memory_timer(State), hibernate, + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -terminate(_Reason, State = #q{mixed_state = MS}) -> +terminate(_Reason, State = #q{variable_queue_state = VQS}) -> %% FIXME: How do we cancel active subscriptions? - State1 = stop_memory_timer(State), %% Ensure that any persisted tx messages are removed; %% mixed_queue:delete_queue cannot do that for us since neither %% mixed_queue nor disk_queue keep a record of uncommitted tx %% messages. - {ok, MS1} = rabbit_mixed_queue:tx_rollback( - lists:concat([PM || #tx { pending_messages = PM } <- - all_tx_record()]), MS), - %% Delete from disk queue first. If we crash at this point, when a + %% TODO: wait for all in flight tx_commits to complete + VQS1 = rabbit_variable_queue:tx_rollback( + lists:concat([PM || #tx { pending_messages = PM } <- + all_tx_record()]), VQS), + %% Delete from disk first. If we crash at this point, when a %% durable queue, we will be recreated at startup, possibly with %% partial content. The alternative is much worse however - if we %% called internal_delete first, we would then have a race between - %% the disk_queue delete and a new queue with the same name being + %% the disk delete and a new queue with the same name being %% created and published to. - {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1), - ok = rabbit_amqqueue:internal_delete(qname(State1)). + _VQS = rabbit_variable_queue:delete(VQS1), + ok = rabbit_amqqueue:internal_delete(qname(State)). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -144,27 +139,14 @@ code_change(_OldVsn, State, _Extra) -> reply(Reply, NewState) -> assert_invariant(NewState), - {reply, Reply, start_memory_timer(NewState), hibernate}. + {reply, Reply, NewState, hibernate}. noreply(NewState) -> assert_invariant(NewState), - {noreply, start_memory_timer(NewState), hibernate}. + {noreply, NewState, hibernate}. -assert_invariant(#q { active_consumers = AC, mixed_state = MS }) -> - true = (queue:is_empty(AC) orelse rabbit_mixed_queue:is_empty(MS)). - -start_memory_timer(State = #q { memory_report_timer = undefined }) -> - {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL, - report_memory), - report_memory(false, State #q { memory_report_timer = TRef }); -start_memory_timer(State) -> - State. - -stop_memory_timer(State = #q { memory_report_timer = undefined }) -> - State; -stop_memory_timer(State = #q { memory_report_timer = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #q { memory_report_timer = undefined }. +assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) -> + true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -282,25 +264,24 @@ deliver_msgs_to_consumers( deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, - State = #q { mixed_state = MS }) -> - {{Msg, IsDelivered, AckTag, Remaining}, MS1} = - rabbit_mixed_queue:fetch(MS), + State = #q { variable_queue_state = VQS }) -> + {{Msg, IsDelivered, AckTag, Remaining}, VQS1} = + rabbit_variable_queue:fetch(VQS), AutoAcks1 = case AckRequired of true -> AutoAcks; false -> [AckTag | AutoAcks] end, {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1}, - State #q { mixed_state = MS1 }}. + State #q { variable_queue_state = VQS1 }}. -run_message_queue(State = #q { mixed_state = MS }) -> +run_message_queue(State = #q { variable_queue_state = VQS }) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, - IsEmpty = rabbit_mixed_queue:is_empty(MS), + IsEmpty = rabbit_variable_queue:is_empty(VQS), {{_IsEmpty1, AutoAcks}, State1} = deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State), - {ok, MS1} = - rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state), - State1 #q { mixed_state = MS1 }. + VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state), + State1 #q { variable_queue_state = VQS1 }. attempt_immediate_delivery(none, _ChPid, Msg, State) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, @@ -309,10 +290,10 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> {AckTag, State2} = case AckRequired of true -> - {ok, AckTag1, MS} = - rabbit_mixed_queue:publish_delivered( - Msg, State1 #q.mixed_state), - {AckTag1, State1 #q { mixed_state = MS }}; + {AckTag1, VQS} = + rabbit_variable_queue:publish_delivered( + Msg, State1 #q.variable_queue_state), + {AckTag1, State1 #q { variable_queue_state = VQS }}; false -> {noack, State1} end, @@ -320,9 +301,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) -> end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); attempt_immediate_delivery(Txn, ChPid, Msg, State) -> - {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state), + VQS = rabbit_variable_queue:tx_publish(Msg, State #q.variable_queue_state), record_pending_message(Txn, ChPid, Msg), - {true, State #q { mixed_state = MS }}. + {true, State #q { variable_queue_state = VQS }}. deliver_or_enqueue(Txn, ChPid, Msg, State) -> case attempt_immediate_delivery(Txn, ChPid, Msg, State) of @@ -330,8 +311,9 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state), - {false, NewState #q { mixed_state = MS }} + {_SeqId, VQS} = rabbit_variable_queue:publish( + Msg, State #q.variable_queue_state), + {false, NewState #q { variable_queue_state = VQS }} end. %% all these messages have already been delivered at least once and @@ -344,11 +326,11 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_msgs_to_consumers( Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State), - {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, NewState #q.mixed_state), + VQS = rabbit_variable_queue:ack(AutoAcks, NewState #q.variable_queue_state), case OutstandingMsgs of - [] -> NewState #q { mixed_state = MS }; - _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS), - NewState #q { mixed_state = MS1 } + [] -> NewState #q { variable_queue_state = VQS }; + _ -> VQS1 = rabbit_variable_queue:requeue(OutstandingMsgs, VQS), + NewState #q { variable_queue_state = VQS1 } end. deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) -> @@ -504,17 +486,17 @@ commit_transaction(Txn, State) -> store_ch_record(C#cr{unacked_messages = Remaining}), MsgWithAcks end, - {ok, MS} = rabbit_mixed_queue:tx_commit( - PendingMessagesOrdered, Acks, State #q.mixed_state), - State #q { mixed_state = MS }. + VQS = rabbit_variable_queue:tx_commit( + PendingMessagesOrdered, Acks, State #q.variable_queue_state), + State #q { variable_queue_state = VQS }. rollback_transaction(Txn, State) -> #tx { pending_messages = PendingMessages } = lookup_tx(Txn), - {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages, - State #q.mixed_state), + VQS = rabbit_variable_queue:tx_rollback(PendingMessages, + State #q.variable_queue_state), erase_tx(Txn), - State #q { mixed_state = MS }. + State #q { variable_queue_state = VQS }. %% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C %% err, A = C `intersect` D , via projection through the dict that is C @@ -529,12 +511,10 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete; i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments; -i(storage_mode, #q{ mixed_state = MS }) -> - rabbit_mixed_queue:storage_mode(MS); i(pid, _) -> self(); -i(messages_ready, #q { mixed_state = MS }) -> - rabbit_mixed_queue:len(MS); +i(messages_ready, #q { variable_queue_state = VQS }) -> + rabbit_variable_queue:len(VQS); i(messages_unacknowledged, _) -> lists:sum([dict:size(UAM) || #cr{unacked_messages = UAM} <- all_ch_record()]); @@ -558,11 +538,6 @@ i(memory, _) -> i(Item, _) -> throw({bad_argument, Item}). -report_memory(Hib, State = #q { mixed_state = MS }) -> - {MS1, MSize} = rabbit_mixed_queue:estimate_queue_memory(MS), - rabbit_memory_manager:report_memory(self(), MSize, Hib), - State #q { mixed_state = MS1 }. - %--------------------------------------------------------------------------- handle_call(info, _From, State) -> @@ -612,25 +587,25 @@ handle_call({notify_down, ChPid}, From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, next_msg_id = NextId, - mixed_state = MS + variable_queue_state = VQS }) -> - case rabbit_mixed_queue:fetch(MS) of - {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 }); - {{Msg, IsDelivered, AckTag, Remaining}, MS1} -> + case rabbit_variable_queue:fetch(VQS) of + {empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 }); + {{Msg, IsDelivered, AckTag, Remaining}, VQS1} -> AckRequired = not(NoAck), - {ok, MS2} = + {ok, VQS2} = case AckRequired of true -> C = #cr{unacked_messages = UAM} = ch_record(ChPid), NewUAM = dict:store(NextId, {Msg, AckTag}, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {ok, MS1}; + {ok, VQS1}; false -> - rabbit_mixed_queue:ack([AckTag], MS1) + rabbit_variable_queue:ack([AckTag], VQS1) end, Message = {QName, self(), NextId, IsDelivered, Msg}, reply({ok, Remaining, Message}, - State #q { next_msg_id = NextId + 1, mixed_state = MS2 }) + State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 }) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, @@ -710,14 +685,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - mixed_state = MS, + variable_queue_state = VQS, active_consumers = ActiveConsumers}) -> - Length = rabbit_mixed_queue:len(MS), + Length = rabbit_variable_queue:len(VQS), reply({ok, Name, Length, queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, - State = #q { mixed_state = MS }) -> - Length = rabbit_mixed_queue:len(MS), + State = #q { variable_queue_state = VQS }) -> + Length = rabbit_variable_queue:len(VQS), IsEmpty = Length == 0, IsUnused = is_unused(State), if @@ -730,8 +705,8 @@ handle_call({delete, IfUnused, IfEmpty}, _From, end; handle_call(purge, _From, State) -> - {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state), - reply({ok, Count}, State #q { mixed_state = MS }); + {Count, VQS} = rabbit_variable_queue:purge(State #q.variable_queue_state), + reply({ok, Count}, State #q { variable_queue_state = VQS }); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -770,11 +745,11 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case Txn of none -> {MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM), - {ok, MS} = rabbit_mixed_queue:ack( - [AckTag || {_Msg, AckTag} <- MsgWithAcks], - State #q.mixed_state), + VQS = rabbit_variable_queue:ack( + [AckTag || {_Msg, AckTag} <- MsgWithAcks], + State #q.variable_queue_state), store_ch_record(C#cr{unacked_messages = Remaining}), - noreply(State #q { mixed_state = MS }); + noreply(State #q { variable_queue_state = VQS }); _ -> record_pending_acks(Txn, ChPid, MsgIds), noreply(State) @@ -822,23 +797,7 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)); - -handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> - PendingMessages = - lists:flatten([Pending || #tx { pending_messages = Pending} - <- all_tx_record()]), - Mode1 = case Mode of - liberated -> mixed; - oppressed -> disk - end, - {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode1, PendingMessages, MS), - noreply(State #q { mixed_state = MS1 }). - -handle_info(report_memory, State) -> - %% deliberately don't call noreply/1 as we don't want to start the timer. - %% By unsetting the timer, we force a report on the next normal message. - {noreply, State #q { memory_report_timer = undefined }, hibernate}; + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -860,9 +819,6 @@ handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. -handle_pre_hibernate(State = #q { mixed_state = MS }) -> - MS1 = rabbit_mixed_queue:maybe_prefetch(MS), - State1 = - stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })), - %% don't call noreply/1 as that'll restart the memory_report_timer - {hibernate, State1}. +handle_pre_hibernate(State = #q { variable_queue_state = VQS }) -> + VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS), + {hibernate, State #q { variable_queue_state = VQS1 }}. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl deleted file mode 100644 index 7d44dd9d54..0000000000 --- a/src/rabbit_disk_queue.erl +++ /dev/null @@ -1,743 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_disk_queue). - --behaviour(gen_server2). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3, - tx_rollback/1, requeue/2, purge/1, delete_queue/1, - delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3, - prefetch/1 - ]). - --export([stop/0, stop_and_obliterate/0]). - -%%---------------------------------------------------------------------------- - --include("rabbit.hrl"). - --define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). --define(BATCH_SIZE, 10000). - --define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}). --define(SHUTDOWN_MESSAGE, - #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, - msg_id = infinity_and_beyond, - is_delivered = never, - is_persistent = true - }). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - --define(SERVER, ?MODULE). - --record(dqstate, { sequences }). %% next read and write for each q - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(msg_id() :: guid()). --type(seq_id() :: non_neg_integer()). --type(ack_tag() :: {msg_id(), seq_id()}). - --spec(start_link/0 :: () -> - ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok'). --spec(fetch/1 :: (queue_name()) -> - ('empty' | - {message(), boolean(), ack_tag(), non_neg_integer()})). --spec(phantom_fetch/1 :: (queue_name()) -> - ('empty' | - {msg_id(), boolean(), ack_tag(), non_neg_integer()})). --spec(prefetch/1 :: (queue_name()) -> 'ok'). --spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok'). --spec(tx_publish/1 :: (message()) -> 'ok'). --spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean(), boolean()}], - [ack_tag()]) -> - 'ok'). --spec(tx_rollback/1 :: ([msg_id()]) -> 'ok'). --spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok'). --spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok'). --spec(purge/1 :: (queue_name()) -> non_neg_integer()). --spec(delete_queue/1 :: (queue_name()) -> 'ok'). --spec(delete_non_durable_queues/1 :: ([queue_name()]) -> 'ok'). --spec(len/1 :: (queue_name()) -> non_neg_integer()). --spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A), - A, queue_name()) -> A). --spec(stop/0 :: () -> 'ok'). --spec(stop_and_obliterate/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- -%% public API -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - -publish(Q, Message = #basic_message {}, IsDelivered) -> - gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}). - -fetch(Q) -> - gen_server2:call(?SERVER, {fetch, Q}, infinity). - -phantom_fetch(Q) -> - gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity). - -prefetch(Q) -> - gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}). - -ack(Q, MsgSeqIds) when is_list(MsgSeqIds) -> - gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}). - -tx_publish(Message = #basic_message {}) -> - gen_server2:cast(?SERVER, {tx_publish, Message}). - -tx_commit(Q, PubMsgIds, AckSeqIds) - when is_list(PubMsgIds) andalso is_list(AckSeqIds) -> - gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity). - -tx_rollback(MsgIds) when is_list(MsgIds) -> - gen_server2:cast(?SERVER, {tx_rollback, MsgIds}). - -requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) -> - gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}). - -requeue_next_n(Q, N) when is_integer(N) -> - gen_server2:cast(?SERVER, {requeue_next_n, Q, N}). - -purge(Q) -> - gen_server2:call(?SERVER, {purge, Q}, infinity). - -delete_queue(Q) -> - gen_server2:call(?SERVER, {delete_queue, Q}, infinity). - -delete_non_durable_queues(DurableQueues) -> - gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, - infinity). - -len(Q) -> - gen_server2:call(?SERVER, {len, Q}, infinity). - -foldl(Fun, Init, Acc) -> - gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity). - -stop() -> - gen_server2:call(?SERVER, stop, infinity). - -stop_and_obliterate() -> - gen_server2:call(?SERVER, stop_vaporise, infinity). - -%% private - -finalise_commit(TxDetails) -> - gen_server2:cast(?SERVER, {finalise_commit, TxDetails}). - -%%---------------------------------------------------------------------------- -%% gen_server behaviour -%%---------------------------------------------------------------------------- - -init([]) -> - %% If the gen_server is part of a supervision tree and is ordered - %% by its supervisor to terminate, terminate will be called with - %% Reason=shutdown if the following conditions apply: - %% * the gen_server has been set to trap exit signals, and - %% * the shutdown strategy as defined in the supervisor's - %% child specification is an integer timeout value, not - %% brutal_kill. - %% Otherwise, the gen_server will be immediately terminated. - process_flag(trap_exit, true), - - ok = filelib:ensure_dir(form_filename("nothing")), - - ok = detect_shutdown_state_and_adjust_delivered_flags(), - - {ok, _Pid} = rabbit_msg_store:start_link(base_directory(), - fun msg_ref_gen/1, - msg_ref_gen_init()), - ok = prune(), - - Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]), - ok = extract_sequence_numbers(Sequences), - - State = #dqstate { sequences = Sequences }, - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({fetch, Q}, _From, State) -> - {Result, State1} = internal_fetch_body(Q, pop_queue, State), - reply(Result, State1); -handle_call({phantom_fetch, Q}, _From, State) -> - Result = internal_fetch_attributes(Q, record_delivery, State), - reply(Result, State); -handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) -> - State1 = - internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State), - noreply(State1); -handle_call({purge, Q}, _From, State) -> - {ok, Count, State1} = internal_purge(Q, State), - reply(Count, State1); -handle_call({delete_queue, Q}, From, State) -> - gen_server2:reply(From, ok), - {ok, State1} = internal_delete_queue(Q, State), - noreply(State1); -handle_call({len, Q}, _From, State = #dqstate { sequences = Sequences }) -> - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - reply(WriteSeqId - ReadSeqId, State); -handle_call({foldl, Fun, Init, Q}, _From, State) -> - {ok, Result, State1} = internal_foldl(Q, Fun, Init, State), - reply(Result, State1); -handle_call(stop, _From, State) -> - {stop, normal, ok, State}; %% gen_server now calls terminate -handle_call(stop_vaporise, _From, State) -> - State1 = shutdown(State), - {atomic, ok} = mnesia:clear_table(rabbit_disk_queue), - lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), - {stop, normal, ok, State1}; %% gen_server now calls terminate -handle_call({delete_non_durable_queues, DurableQueues}, _From, State) -> - {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State), - reply(ok, State1). - -handle_cast({publish, Q, Message, IsDelivered}, State) -> - {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State), - noreply(State1); -handle_cast({ack, Q, MsgSeqIds}, State) -> - {ok, State1} = internal_ack(Q, MsgSeqIds, State), - noreply(State1); -handle_cast({tx_publish, Message}, State) -> - {ok, State1} = internal_tx_publish(Message, State), - noreply(State1); -handle_cast({tx_rollback, MsgIds}, State) -> - {ok, State1} = internal_tx_rollback(MsgIds, State), - noreply(State1); -handle_cast({requeue, Q, MsgSeqIds}, State) -> - {ok, State1} = internal_requeue(Q, MsgSeqIds, State), - noreply(State1); -handle_cast({requeue_next_n, Q, N}, State) -> - {ok, State1} = internal_requeue_next_n(Q, N, State), - noreply(State1); -handle_cast({prefetch, Q, From}, State) -> - {Result, State1} = internal_fetch_body(Q, peek_queue, State), - case rabbit_misc:with_exit_handler( - fun () -> false end, - fun () -> - ok = rabbit_queue_prefetcher:publish(From, Result), - true - end) of - true -> - internal_fetch_attributes(Q, ignore_delivery, State1); - false -> ok - end, - noreply(State1); -handle_cast({finalise_commit, TxDetails}, State) -> - noreply(finalise_commit(TxDetails, State)). - -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. - -terminate(_Reason, State) -> - State1 = shutdown(State), - store_safe_shutdown(), - State1. - -shutdown(State = #dqstate { sequences = undefined }) -> - State; -shutdown(State = #dqstate { sequences = Sequences }) -> - ok = rabbit_msg_store:stop(), - ets:delete(Sequences), - State #dqstate { sequences = undefined }. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- -%% general helper functions -%%---------------------------------------------------------------------------- - -noreply(State) -> - {noreply, State, hibernate}. - -reply(Reply, State) -> - {reply, Reply, State, hibernate}. - -form_filename(Name) -> - filename:join(base_directory(), Name). - -base_directory() -> - filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). - -sequence_lookup(Sequences, Q) -> - case ets:lookup(Sequences, Q) of - [] -> {0, 0}; - [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId} - end. - -%%---------------------------------------------------------------------------- -%% internal functions -%%---------------------------------------------------------------------------- - -internal_fetch_body(Q, Advance, State) -> - case next(Q, record_delivery, Advance, State) of - empty -> {empty, State}; - {MsgId, IsDelivered, AckTag, Remaining} -> - {ok, Message} = rabbit_msg_store:read(MsgId), - {{Message, IsDelivered, AckTag, Remaining}, State} - end. - -internal_fetch_attributes(Q, MarkDelivered, State) -> - next(Q, MarkDelivered, pop_queue, State). - -next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) -> - case sequence_lookup(Sequences, Q) of - {SeqId, SeqId} -> empty; - {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId -> - Remaining = WriteSeqId - ReadSeqId - 1, - {MsgId, IsDelivered} = - update_message_attributes(Q, ReadSeqId, MarkDelivered), - ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId), - AckTag = {MsgId, ReadSeqId}, - {MsgId, IsDelivered, AckTag, Remaining} - end. - -update_message_attributes(Q, SeqId, MarkDelivered) -> - [Obj = - #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = - mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), - ok = case {IsDelivered, MarkDelivered} of - {true, _} -> ok; - {false, ignore_delivery} -> ok; - {false, record_delivery} -> - mnesia:dirty_write(rabbit_disk_queue, - Obj #dq_msg_loc {is_delivered = true}) - end, - {MsgId, IsDelivered}. - -maybe_advance(peek_queue, _, _, _, _) -> - ok; -maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) -> - true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}), - ok. - -internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) -> - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId). - -internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) -> - {ok, Acc, State}; -internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) -> - [#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] = - mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}), - {ok, Message} = rabbit_msg_store:read(MsgId), - Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc), - internal_foldl(Q, WriteSeqId, Fun, State, Acc1, ReadSeqId + 1). - -internal_ack(Q, MsgSeqIds, State) -> - remove_messages(Q, MsgSeqIds, State). - -remove_messages(Q, MsgSeqIds, State) -> - MsgIds = lists:foldl( - fun ({MsgId, SeqId}, MsgIdAcc) -> - ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}), - [MsgId | MsgIdAcc] - end, [], MsgSeqIds), - ok = rabbit_msg_store:remove(MsgIds), - {ok, State}. - -internal_tx_publish(Message = #basic_message { guid = MsgId, - content = Content }, State) -> - ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), - ok = rabbit_msg_store:write( - MsgId, Message #basic_message { content = ClearedContent }), - {ok, State}. - -internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) -> - TxDetails = {Q, PubMsgIds, AckSeqIds, From}, - ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds], - fun () -> finalise_commit(TxDetails) end), - State. - -finalise_commit({Q, PubMsgIds, AckSeqIds, From}, - State = #dqstate { sequences = Sequences }) -> - {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q), - WriteSeqId = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun ({MsgId, IsDelivered, IsPersistent}, SeqId) -> - ok = mnesia:write( - rabbit_disk_queue, - #dq_msg_loc { - queue_and_seq_id = {Q, SeqId}, - msg_id = MsgId, - is_delivered = IsDelivered, - is_persistent = IsPersistent - }, write), - SeqId + 1 - end, InitWriteSeqId, PubMsgIds) - end), - {ok, State1} = remove_messages(Q, AckSeqIds, State), - true = case PubMsgIds of - [] -> true; - _ -> ets:insert(Sequences, - {Q, InitReadSeqId, WriteSeqId}) - end, - gen_server2:reply(From, ok), - State1. - -internal_publish(Q, Message = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered, State) -> - {ok, State1 = #dqstate { sequences = Sequences }} = - internal_tx_publish(Message, State), - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - ok = mnesia:dirty_write(rabbit_disk_queue, - #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId}, - msg_id = MsgId, - is_delivered = IsDelivered, - is_persistent = IsPersistent }), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}), - {ok, {MsgId, WriteSeqId}, State1}. - -internal_tx_rollback(MsgIds, State) -> - ok = rabbit_msg_store:remove(MsgIds), - {ok, State}. - -internal_requeue(_Q, [], State) -> - {ok, State}; -internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) -> - %% We know that every seq_id in here is less than the ReadSeqId - %% you'll get if you look up this queue in Sequences (i.e. they've - %% already been delivered). We also know that the rows for these - %% messages are still in rabbit_disk_queue (i.e. they've not been - %% ack'd). - %% - %% Now, it would be nice if we could adjust the sequence ids in - %% rabbit_disk_queue (mnesia) to create a contiguous block and - %% then drop the ReadSeqId for the queue by the corresponding - %% amount. However, this is not safe because there may be other - %% sequence ids which have been sent out as part of deliveries - %% which are not being requeued. As such, moving things about in - %% rabbit_disk_queue _under_ the current ReadSeqId would result in - %% such sequence ids referring to the wrong messages. - %% - %% Therefore, the only solution is to take these messages, and to - %% reenqueue them at the top of the queue. Usefully, this only - %% affects the Sequences and rabbit_disk_queue structures - there - %% is no need to physically move the messages about on disk, so - %% the message store remains unaffected, except we need to tell it - %% about the ids of the requeued messages so it can remove them - %% from its message cache if necessary. - - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - {WriteSeqId1, Q, MsgIds} = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []}, - MsgSeqIds) - end), - true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}), - ok = rabbit_msg_store:release(MsgIds), - {ok, State}. - -requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) -> - [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, SeqId}, write), - ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeqId}, - is_delivered = IsDelivered - }, - write), - ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write), - {WriteSeqId + 1, Q, [MsgId | Acc]}. - -%% move the next N messages from the front of the queue to the back. -internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) -> - {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q), - if N >= (WriteSeqId - ReadSeqId) -> {ok, State}; - true -> - {ReadSeqIdN, WriteSeqIdN, MsgIds} = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, []) - end - ), - true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}), - ok = rabbit_msg_store:release(MsgIds), - {ok, State} - end. - -requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) -> - {ReadSeq, WriteSeq, Acc}; -requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) -> - [Obj = #dq_msg_loc { msg_id = MsgId }] = - mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write), - ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}}, - write), - ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write), - requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]). - -internal_purge(Q, State = #dqstate { sequences = Sequences }) -> - case sequence_lookup(Sequences, Q) of - {SeqId, SeqId} -> {ok, 0, State}; - {ReadSeqId, WriteSeqId} -> - {MsgSeqIds, WriteSeqId} = - rabbit_misc:unfold( - fun (SeqId) when SeqId == WriteSeqId -> false; - (SeqId) -> - [#dq_msg_loc { msg_id = MsgId }] = - mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}), - {true, {MsgId, SeqId}, SeqId + 1} - end, ReadSeqId), - true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}), - {ok, State1} = remove_messages(Q, MsgSeqIds, State), - {ok, WriteSeqId - ReadSeqId, State1} - end. - -internal_delete_queue(Q, State) -> - %% remove everything undelivered - {ok, _Count, State1 = #dqstate { sequences = Sequences }} = - internal_purge(Q, State), - true = ets:delete(Sequences, Q), - %% remove everything already delivered - remove_messages( - Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId}, - msg_id = MsgId } <- - mnesia:dirty_match_object( - rabbit_disk_queue, - #dq_msg_loc { - queue_and_seq_id = {Q, '_'}, - _ = '_' })], State1). - -internal_delete_non_durable_queues( - DurableQueues, State = #dqstate { sequences = Sequences }) -> - DurableQueueSet = sets:from_list(DurableQueues), - ets:foldl( - fun ({Q, _Read, _Write}, {ok, State1}) -> - case sets:is_element(Q, DurableQueueSet) of - true -> {ok, State1}; - false -> internal_delete_queue(Q, State1) - end - end, {ok, State}, Sequences). - -%%---------------------------------------------------------------------------- -%% recovery -%%---------------------------------------------------------------------------- - -store_safe_shutdown() -> - ok = rabbit_misc:execute_mnesia_transaction( - fun() -> - mnesia:write(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE, write) - end). - -detect_shutdown_state_and_adjust_delivered_flags() -> - MarkDelivered = - rabbit_misc:execute_mnesia_transaction( - fun() -> - case mnesia:read(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE_KEY, read) of - [?SHUTDOWN_MESSAGE] -> - mnesia:delete(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE_KEY, write), - false; - [] -> - true - end - end), - %% if we crash here, then on startup we'll not find the - %% SHUTDOWN_MESSAGE so will mark everything delivered, which is - %% the safe thing to do. - case MarkDelivered of - true -> mark_messages_delivered(); - false -> ok - end. - -mark_messages_delivered() -> - mark_message_delivered('$start_of_table'). - -%% A single huge transaction is a bad idea because of memory -%% use. Equally, using dirty operations is a bad idea because you -%% shouldn't do writes when doing mnesia:dirty_next, because the -%% ordering can change. So we use transactions of bounded -%% size. However, even this does necessitate restarting between -%% transactions. -mark_message_delivered('$end_of_table') -> - ok; -mark_message_delivered(_Key) -> - mark_message_delivered( - rabbit_misc:execute_mnesia_transaction( - fun () -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - mark_message_delivered(mnesia:first(rabbit_disk_queue), - ?BATCH_SIZE) - end)). - -mark_message_delivered(Key, 0) -> - Key; -mark_message_delivered(Key = '$end_of_table', _N) -> - Key; -mark_message_delivered(Key, N) -> - [Obj] = mnesia:read(rabbit_disk_queue, Key, write), - M = case Obj #dq_msg_loc.is_delivered of - true -> N; - false -> - ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { is_delivered = true }, - write), - N - 1 - end, - mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). - -msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue). - -msg_ref_gen('$end_of_table') -> finished; -msg_ref_gen(Key) -> - [#dq_msg_loc { msg_id = MsgId, is_persistent = IsPersistent }] = - mnesia:dirty_read(rabbit_disk_queue, Key), - NextKey = mnesia:dirty_next(rabbit_disk_queue, Key), - {MsgId, case IsPersistent of true -> 1; false -> 0 end, NextKey}. - -prune_flush_batch(DeleteAcc) -> - lists:foldl(fun (Key, ok) -> - mnesia:dirty_delete(rabbit_disk_queue, Key) - end, ok, DeleteAcc). - -prune() -> - prune(mnesia:dirty_first(rabbit_disk_queue), [], 0). - -prune('$end_of_table', DeleteAcc, _Len) -> - prune_flush_batch(DeleteAcc); -prune(Key, DeleteAcc, Len) -> - [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] = - mnesia:dirty_read(rabbit_disk_queue, Key), - {DeleteAcc1, Len1} = - case rabbit_msg_store:contains(MsgId) of - true -> {DeleteAcc, Len}; - false -> {[{Q, SeqId} | DeleteAcc], Len + 1} - end, - if Len1 >= ?BATCH_SIZE -> - %% We have no way of knowing how flushing the batch will - %% affect ordering of records within the table, so have no - %% choice but to start again. Although this will make - %% recovery slower for large queues, we guarantee we can - %% start up in constant memory - ok = prune_flush_batch(DeleteAcc1), - NextKey = mnesia:dirty_first(rabbit_disk_queue), - prune(NextKey, [], 0); - true -> - NextKey = mnesia:dirty_next(rabbit_disk_queue, Key), - prune(NextKey, DeleteAcc1, Len1) - end. - -extract_sequence_numbers(Sequences) -> - true = - rabbit_misc:execute_mnesia_transaction( - %% the ets manipulation within this transaction is - %% idempotent, in particular we're only reading from mnesia, - %% and combining what we read with what we find in - %% ets. Should the transaction restart, the non-rolledback - %% data in ets can still be successfully combined with what - %% we find in mnesia - fun() -> - ok = mnesia:read_lock_table(rabbit_disk_queue), - mnesia:foldl( - fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) -> - NextWrite = SeqId + 1, - case ets:lookup(Sequences, Q) of - [] -> ets:insert_new(Sequences, - {Q, SeqId, NextWrite}); - [Orig = {_, Read, Write}] -> - Repl = {Q, lists:min([Read, SeqId]), - lists:max([Write, NextWrite])}, - case Orig == Repl of - true -> true; - false -> ets:insert(Sequences, Repl) - end - end - end, true, rabbit_disk_queue) - end), - ok = remove_gaps_in_sequences(Sequences). - -remove_gaps_in_sequences(Sequences) -> - %% read the comments at internal_requeue. - - %% Because we are at startup, we know that no sequence ids have - %% been issued (or at least, they were, but have been - %% forgotten). Therefore, we can nicely shuffle up and not - %% worry. Note that I'm choosing to shuffle up, but alternatively - %% we could shuffle downwards. However, I think there's greater - %% likelihood of gaps being at the bottom rather than the top of - %% the queue, so shuffling up should be the better bet. - QueueBoundaries = - rabbit_misc:execute_mnesia_transaction( - fun() -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - lists:foldl( - fun ({Q, ReadSeqId, WriteSeqId}, Acc) -> - Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0), - [{Q, ReadSeqId + Gap, WriteSeqId} | Acc] - end, [], ets:match_object(Sequences, '_')) - end), - true = lists:foldl(fun (Obj, true) -> ets:insert(Sequences, Obj) end, - true, QueueBoundaries), - ok. - -shuffle_up(_Q, SeqId, SeqId, Gap) -> - Gap; -shuffle_up(Q, BaseSeqId, SeqId, Gap) -> - GapInc = - case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of - [] -> 1; - [Obj] -> - case Gap of - 0 -> ok; - _ -> mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { - queue_and_seq_id = {Q, SeqId + Gap }}, - write), - mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write) - end, - 0 - end, - shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc). diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl deleted file mode 100644 index c278bac86d..0000000000 --- a/src/rabbit_mixed_queue.erl +++ /dev/null @@ -1,673 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_mixed_queue). - --include("rabbit.hrl"). - --export([init/2]). - --export([publish/2, publish_delivered/2, fetch/1, ack/2, - tx_publish/2, tx_commit/3, tx_rollback/2, requeue/2, purge/1, - len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]). - --export([set_storage_mode/3, storage_mode/1, - estimate_queue_memory/1]). - --record(mqstate, { mode, - msg_buf, - queue, - is_durable, - length, - memory_size, - prefetcher - } - ). - --define(TO_DISK_MAX_FLUSH_SIZE, 100000). --define(MAGIC_MARKER, <<"$magic_marker">>). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(mode() :: ( 'disk' | 'mixed' )). --type(mqstate() :: #mqstate { mode :: mode(), - msg_buf :: queue(), - queue :: queue_name(), - is_durable :: boolean(), - length :: non_neg_integer(), - memory_size :: (non_neg_integer() | 'undefined'), - prefetcher :: (pid() | 'undefined') - }). --type(msg_id() :: guid()). --type(seq_id() :: non_neg_integer()). --type(ack_tag() :: ( 'not_on_disk' | {msg_id(), seq_id()} )). --type(okmqs() :: {'ok', mqstate()}). - --spec(init/2 :: (queue_name(), boolean()) -> okmqs()). --spec(publish/2 :: (message(), mqstate()) -> okmqs()). --spec(publish_delivered/2 :: (message(), mqstate()) -> - {'ok', ack_tag(), mqstate()}). --spec(fetch/1 :: (mqstate()) -> - {('empty' | {message(), boolean(), ack_tag(), non_neg_integer()}), - mqstate()}). --spec(ack/2 :: ([{message(), ack_tag()}], mqstate()) -> okmqs()). --spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()). --spec(tx_commit/3 :: ([message()], [ack_tag()], mqstate()) -> okmqs()). --spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()). --spec(requeue/2 :: ([{message(), ack_tag()}], mqstate()) -> okmqs()). --spec(purge/1 :: (mqstate()) -> okmqs()). --spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}). --spec(len/1 :: (mqstate()) -> non_neg_integer()). --spec(is_empty/1 :: (mqstate()) -> boolean()). - --spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()). --spec(estimate_queue_memory/1 :: (mqstate()) -> - {mqstate(), non_neg_integer()}). --spec(storage_mode/1 :: (mqstate()) -> mode()). - --endif. - -%%---------------------------------------------------------------------------- - -init(Queue, IsDurable) -> - Len = rabbit_disk_queue:len(Queue), - {Size, MarkerFound, MarkerPreludeCount} = - rabbit_disk_queue:foldl( - fun (Msg = #basic_message { is_persistent = true }, - _AckTag, _IsDelivered, {SizeAcc, MFound, MPCount}) -> - SizeAcc1 = SizeAcc + size_of_message(Msg), - case {MFound, is_magic_marker_message(Msg)} of - {false, false} -> {SizeAcc1, false, MPCount + 1}; - {false, true} -> {SizeAcc, true, MPCount}; - {true, false} -> {SizeAcc1, true, MPCount} - end - end, {0, false, 0}, Queue), - Len1 = case MarkerFound of - false -> Len; - true -> - ok = rabbit_disk_queue:requeue_next_n(Queue, - MarkerPreludeCount), - Len2 = Len - 1, - {ok, Len2} = fetch_ack_magic_marker_message(Queue), - Len2 - end, - MsgBuf = inc_queue_length(queue:new(), Len1), - {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue, - is_durable = IsDurable, length = Len1, - memory_size = Size, prefetcher = undefined }}. - -publish(Msg = #basic_message { is_persistent = IsPersistent }, State = - #mqstate { queue = Q, mode = Mode, is_durable = IsDurable, - msg_buf = MsgBuf, length = Length }) -> - Msg1 = ensure_binary_properties(Msg), - ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:publish(Q, Msg1, false); - false -> ok - end, - MsgBuf1 = case Mode of - disk -> inc_queue_length(MsgBuf, 1); - mixed -> queue:in({Msg1, false}, MsgBuf) - end, - {ok, gain_memory(size_of_message(Msg1), - State #mqstate { msg_buf = MsgBuf1, - length = Length + 1 })}. - -%% Assumption here is that the queue is empty already (only called via -%% attempt_immediate_delivery). -publish_delivered(Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent}, - State = #mqstate { is_durable = IsDurable, queue = Q, - length = 0 }) - when IsDurable andalso IsPersistent -> - Msg1 = ensure_binary_properties(Msg), - ok = rabbit_disk_queue:publish(Q, Msg1, true), - State1 = gain_memory(size_of_message(Msg1), State), - %% must call phantom_fetch otherwise the msg remains at the head - %% of the queue. This is synchronous, but unavoidable as we need - %% the AckTag - {MsgId, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q), - {ok, AckTag, State1}; -publish_delivered(Msg, State = #mqstate { length = 0 }) -> - Msg1 = ensure_binary_properties(Msg), - {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}. - -fetch(State = #mqstate { length = 0 }) -> - {empty, State}; -fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q, - is_durable = IsDurable, length = Length, - prefetcher = Prefetcher }) -> - {{value, Value}, MsgBuf1} = queue:out(MsgBuf), - Rem = Length - 1, - State1 = State #mqstate { length = Rem }, - case Value of - {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - IsDelivered} -> - AckTag = - case IsDurable andalso IsPersistent of - true -> - {MsgId, IsDelivered, AckTag1, _PRem} - = rabbit_disk_queue:phantom_fetch(Q), - AckTag1; - false -> - not_on_disk - end, - {{Msg, IsDelivered, AckTag, Rem}, - State1 #mqstate { msg_buf = MsgBuf1 }}; - {Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTag} -> - %% message has come via the prefetcher, thus it's been - %% marked delivered. If it's not persistent+durable, we - %% should ack it now - AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), - {{Msg, IsDelivered, AckTag1, Rem}, - State1 #mqstate { msg_buf = MsgBuf1 }}; - _ when Prefetcher == undefined -> - MsgBuf2 = dec_queue_length(MsgBuf, 1), - {Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTag, _PersistRem} - = rabbit_disk_queue:fetch(Q), - AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag), - {{Msg, IsDelivered, AckTag1, Rem}, - State1 #mqstate { msg_buf = MsgBuf2 }}; - _ -> - %% use State, not State1 as we've not dec'd length - fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of - empty -> State #mqstate { prefetcher = undefined }; - {Fetched, Status} -> - MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)), - State #mqstate - { msg_buf = queue:join(Fetched, MsgBuf2), - prefetcher = case Status of - finished -> undefined; - continuing -> Prefetcher - end } - end) - end. - -ack(MsgsWithAcks, State = #mqstate { queue = Q }) -> - {AckTags, ASize} = remove_diskless(MsgsWithAcks), - ok = case AckTags of - [] -> ok; - _ -> rabbit_disk_queue:ack(Q, AckTags) - end, - {ok, lose_memory(ASize, State)}. - -tx_publish(Msg = #basic_message { is_persistent = IsPersistent }, - State = #mqstate { mode = Mode, is_durable = IsDurable }) -> - Msg1 = ensure_binary_properties(Msg), - ok = case on_disk(Mode, IsDurable, IsPersistent) of - true -> rabbit_disk_queue:tx_publish(Msg1); - false -> ok - end, - {ok, gain_memory(size_of_message(Msg1), State)}. - -tx_commit(Publishes, MsgsWithAcks, - State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, length = Length }) -> - PersistentPubs = - [{MsgId, false, IsPersistent} || - #basic_message { guid = MsgId, - is_persistent = IsPersistent } <- Publishes, - on_disk(Mode, IsDurable, IsPersistent)], - {RealAcks, ASize} = remove_diskless(MsgsWithAcks), - ok = case {PersistentPubs, RealAcks} of - {[], []} -> ok; - _ -> rabbit_disk_queue:tx_commit( - Q, PersistentPubs, RealAcks) - end, - Len = length(Publishes), - MsgBuf1 = case Mode of - disk -> inc_queue_length(MsgBuf, Len); - mixed -> ToAdd = [{Msg, false} || Msg <- Publishes], - queue:join(MsgBuf, queue:from_list(ToAdd)) - end, - {ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1, - length = Length + Len })}. - -tx_rollback(Publishes, - State = #mqstate { mode = Mode, is_durable = IsDurable }) -> - {PersistentPubs, CSize} = - lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }, {Acc, CSizeAcc}) -> - Msg1 = ensure_binary_properties(Msg), - CSizeAcc1 = CSizeAcc + size_of_message(Msg1), - {case on_disk(Mode, IsDurable, IsPersistent) of - true -> [MsgId | Acc]; - _ -> Acc - end, CSizeAcc1} - end, {[], 0}, Publishes), - ok = case PersistentPubs of - [] -> ok; - _ -> rabbit_disk_queue:tx_rollback(PersistentPubs) - end, - {ok, lose_memory(CSize, State)}. - -%% [{Msg, AckTag}] -requeue(MsgsWithAckTags, - State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf, - is_durable = IsDurable, length = Length }) -> - RQ = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag}, - RQAcc) -> - case IsDurable andalso IsPersistent of - true -> - [{AckTag, true} | RQAcc]; - false -> - case Mode of - mixed -> - RQAcc; - disk when not_on_disk =:= AckTag -> - ok = case RQAcc of - [] -> ok; - _ -> rabbit_disk_queue:requeue - (Q, lists:reverse(RQAcc)) - end, - ok = rabbit_disk_queue:publish(Q, Msg, true), - [] - end - end - end, [], MsgsWithAckTags), - ok = case RQ of - [] -> ok; - _ -> rabbit_disk_queue:requeue(Q, lists:reverse(RQ)) - end, - Len = length(MsgsWithAckTags), - MsgBuf1 = case Mode of - mixed -> ToAdd = [{Msg, true} || {Msg, _} <- MsgsWithAckTags], - queue:join(MsgBuf, queue:from_list(ToAdd)); - disk -> inc_queue_length(MsgBuf, Len) - end, - {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + Len }}. - -purge(State = #mqstate { queue = Q, mode = Mode, length = Count, - prefetcher = Prefetcher, memory_size = QSize }) -> - PurgedFromDisk = rabbit_disk_queue:purge(Q), - Count = case Mode of - disk -> - PurgedFromDisk; - mixed -> - ok = case Prefetcher of - undefined -> ok; - _ -> rabbit_queue_prefetcher:stop(Prefetcher) - end, - Count - end, - {Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(), - length = 0, - prefetcher = undefined })}. - -delete_queue(State = #mqstate { queue = Q, memory_size = QSize, - prefetcher = Prefetcher - }) -> - ok = case Prefetcher of - undefined -> ok; - _ -> rabbit_queue_prefetcher:stop(Prefetcher) - end, - ok = rabbit_disk_queue:delete_queue(Q), - {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(), - prefetcher = undefined })}. - -len(#mqstate { length = Length }) -> - Length. - -is_empty(#mqstate { length = Length }) -> - 0 == Length. - -%%---------------------------------------------------------------------------- -%% storage mode management -%%---------------------------------------------------------------------------- - -set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) -> - {ok, State}; -set_storage_mode(disk, TxnMessages, State = - #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, length = Length, - is_durable = IsDurable, prefetcher = Prefetcher }) -> - State1 = State #mqstate { mode = disk }, - MsgBuf1 = - case Prefetcher of - undefined -> MsgBuf; - _ -> - case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of - empty -> MsgBuf; - Fetched -> - MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)), - queue:join(Fetched, MsgBuf2) - end - end, - {ok, MsgBuf3} = - send_messages_to_disk(IsDurable, Q, MsgBuf1, Length), - %% tx_publish txn messages. Some of these will have been already - %% published if they really are durable and persistent which is - %% why we can't just use our own tx_publish/2 function (would end - %% up publishing twice, so refcount would go wrong in disk_queue). - %% The order of msgs within a txn is determined only at tx_commit - %% time, so it doesn't matter if we're publishing msgs to the disk - %% queue in a different order from that which we received them in. - lists:foreach( - fun (Msg = #basic_message { is_persistent = IsPersistent }) -> - ok = case IsDurable andalso IsPersistent of - true -> ok; - _ -> rabbit_disk_queue:tx_publish(Msg) - end - end, TxnMessages), - garbage_collect(), - {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }}; -set_storage_mode(mixed, TxnMessages, State = - #mqstate { mode = disk, is_durable = IsDurable }) -> - %% The queue has a token just saying how many msgs are on disk - %% (this is already built for us when in disk mode). - %% Don't actually do anything to the disk - %% Don't start prefetcher just yet because the queue maybe busy - - %% wait for hibernate timeout in the amqqueue_process. - - %% Remove txn messages from disk which are not (persistent and - %% durable). This is necessary to avoid leaks. This is also pretty - %% much the inverse behaviour of our own tx_rollback/2 which is - %% why we're not using that. - Cancel = [ MsgId || #basic_message { is_persistent = IsPersistent, - guid = MsgId } <- TxnMessages, - not (IsDurable andalso IsPersistent) ], - ok = case Cancel of - [] -> ok; - _ -> rabbit_disk_queue:tx_rollback(Cancel) - end, - garbage_collect(), - {ok, State #mqstate { mode = mixed }}. - -send_messages_to_disk(_IsDurable, _Q, MsgBuf, 0) -> - {ok, MsgBuf}; -send_messages_to_disk(IsDurable, Q, MsgBuf, Length) -> - case scan_for_disk_after_ram(IsDurable, MsgBuf) of - disk_only -> - %% Everything on disk already, we don't need to do - %% anything - {ok, inc_queue_length(queue:new(), Length)}; - {not_found, PrefixLen, MsgBufRAMSuffix} -> - %% No disk msgs follow RAM msgs and the queue has a RAM - %% suffix, so we can just publish those. If we crash at - %% this point, we may lose some messages, but everything - %% will remain in the right order, so no need for the - %% marker messages. - MsgBuf1 = inc_queue_length(queue:new(), PrefixLen), - send_messages_to_disk(IsDurable, Q, MsgBufRAMSuffix, 0, 0, [], [], - MsgBuf1); - found -> - %% There are disk msgs *after* ram msgs in the queue. We - %% need to reenqueue everything. Note that due to batching - %% going on (see comments above send_messages_to_disk/8), - %% if we crash during this transition, we could have - %% messages in the wrong order on disk. Thus we publish a - %% magic_marker_message which, when this transition is - %% complete, will be back at the head of the queue. Should - %% we die, on startup, during the foldl over the queue, we - %% detect the marker message and requeue all the messages - %% in front of it, to the back of the queue, thus - %% correcting the order. The result is that everything - %% ends up back in the same order, but will have new - %% sequence IDs. - ok = publish_magic_marker_message(Q), - {ok, MsgBuf1} = - send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], [], - queue:new()), - {ok, Length} = fetch_ack_magic_marker_message(Q), - {ok, MsgBuf1} - end. - -scan_for_disk_after_ram(IsDurable, MsgBuf) -> - scan_for_disk_after_ram(IsDurable, MsgBuf, {disk, 0}). - -%% We return 'disk_only' if everything is alread on disk; 'found' if -%% we find a disk message after finding RAM messages; and -%% {'not_found', Count, MsgBuf} otherwise, where Count is the length -%% of the disk prefix, and MsgBuf is the RAM suffix of the MsgBuf -%% argument. Note msgs via the prefetcher are counted as RAM msgs on -%% the grounds that they have to be republished. -scan_for_disk_after_ram(IsDurable, MsgBuf, Mode) -> - case queue:out(MsgBuf) of - {empty, _MsgBuf} -> - case Mode of - {ram, N, MsgBuf1} -> {not_found, N, MsgBuf1}; - {disk, _N} -> disk_only - end; - {{value, {on_disk, Count}}, MsgBuf1} -> - case Mode of - {ram, _, _} -> found; %% found disk after RAM, bad - {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, - {disk, N + Count}) - end; - {{value, {_Msg, _IsDelivered, _AckTag}}, MsgBuf1} -> - %% found a msg from the prefetcher. Ensure RAM mode - scan_for_disk_after_ram(IsDurable, MsgBuf1, - ensure_ram(Mode, MsgBuf)); - {{value, - {#basic_message { is_persistent = IsPersistent }, _IsDelivered}}, - MsgBuf1} -> - %% normal message - case IsDurable andalso IsPersistent of - true -> - case Mode of - {ram, _, _} -> found; %% found disk after RAM, bad - {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1, - {disk, N + 1}) - end; - false -> scan_for_disk_after_ram(IsDurable, MsgBuf1, - ensure_ram(Mode, MsgBuf)) - end - end. - -ensure_ram(Obj = {ram, _N, _MsgBuf}, _MsgBuf1) -> Obj; -ensure_ram({disk, N}, MsgBuf) -> {ram, N, MsgBuf}. - -%% (Re)enqueue _everything_ here. Messages which are not on disk will -%% be tx_published, messages that are on disk will be requeued to the -%% end of the queue. This is done in batches, where a batch consists -%% of a number a tx_publishes, a tx_commit and then a call to -%% requeue_next_n. We do not want to fetch messages off disk only to -%% republish them later. Note in the tx_commit, we ack messages which -%% are being _re_published. These are messages that have been fetched -%% by the prefetcher. -%% Batches are limited in size to make sure that the resultant mnesia -%% transaction on tx_commit does not get too big, memory wise. -send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount, - Commit, Ack, MsgBuf) -> - case queue:out(Queue) of - {empty, _Queue} -> - ok = flush_messages_to_disk_queue(Q, Commit, Ack), - {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []), - {ok, MsgBuf}; - {{value, {Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered}}, Queue1} -> - case IsDurable andalso IsPersistent of - true -> %% it's already in the Q - send_messages_to_disk( - IsDurable, Q, Queue1, PublishCount, RequeueCount + 1, - Commit, Ack, inc_queue_length(MsgBuf, 1)); - false -> - republish_message_to_disk_queue( - IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - Ack, MsgBuf, Msg, IsDelivered) - end; - {{value, {Msg, IsDelivered, AckTag}}, Queue1} -> - %% These have come via the prefetcher, so are no longer in - %% the disk queue (yes, they've not been ack'd yet, but - %% the head of the queue has passed these messages). We - %% need to requeue them, which we sneakily achieve by - %% tx_publishing them, and then in the tx_commit, ack the - %% old copy. - republish_message_to_disk_queue( - IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit, - [AckTag | Ack], MsgBuf, Msg, IsDelivered); - {{value, {on_disk, Count}}, Queue1} -> - send_messages_to_disk( - IsDurable, Q, Queue1, PublishCount, RequeueCount + Count, - Commit, Ack, inc_queue_length(MsgBuf, Count)) - end. - -republish_message_to_disk_queue( - IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, MsgBuf, - Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - IsDelivered) -> - {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack), - ok = rabbit_disk_queue:tx_publish(Msg), - Commit2 = [{MsgId, IsDelivered, IsPersistent} | Commit1], - {PublishCount1, Commit3, Ack2} = - case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of - true -> ok = flush_messages_to_disk_queue(Q, Commit2, Ack1), - {0, [], []}; - false -> {PublishCount + 1, Commit2, Ack1} - end, - send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0, - Commit3, Ack2, inc_queue_length(MsgBuf, 1)). - -flush_messages_to_disk_queue(_Q, [], []) -> - ok; -flush_messages_to_disk_queue(Q, Commit, Ack) -> - rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack). - -flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) -> - {Commit, Ack}; -flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) -> - ok = flush_messages_to_disk_queue(Q, Commit, Ack), - ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount), - {[], []}. - -%% Scaling this by 4 is a magic number. Found by trial and error to -%% work ok. We are deliberately over reporting so that we run out of -%% memory sooner rather than later, because the transition to disk -%% only modes transiently can take quite a lot of memory. -estimate_queue_memory(State = #mqstate { memory_size = Size }) -> - {State, 4 * Size}. - -storage_mode(#mqstate { mode = Mode }) -> - Mode. - -%%---------------------------------------------------------------------------- -%% helpers -%%---------------------------------------------------------------------------- - -size_of_message( - #basic_message { content = #content { payload_fragments_rev = Payload, - properties_bin = PropsBin }}) - when is_binary(PropsBin) -> - size(PropsBin) + lists:foldl(fun (Frag, SumAcc) -> - SumAcc + size(Frag) - end, 0, Payload). - -ensure_binary_properties(Msg = #basic_message { content = Content }) -> - Msg #basic_message { - content = rabbit_binary_generator:ensure_content_encoded(Content) }. - -gain_memory(Inc, State = #mqstate { memory_size = QSize }) -> - State #mqstate { memory_size = QSize + Inc }. - -lose_memory(Dec, State = #mqstate { memory_size = QSize }) -> - State #mqstate { memory_size = QSize - Dec }. - -inc_queue_length(MsgBuf, 0) -> - MsgBuf; -inc_queue_length(MsgBuf, Count) -> - {NewCount, MsgBufTail} = - case queue:out_r(MsgBuf) of - {empty, MsgBuf1} -> {Count, MsgBuf1}; - {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1}; - {{value, _}, _MsgBuf1} -> {Count, MsgBuf} - end, - queue:in({on_disk, NewCount}, MsgBufTail). - -dec_queue_length(MsgBuf, Count) -> - case queue:out(MsgBuf) of - {{value, {on_disk, Len}}, MsgBuf1} -> - case Len of - Count -> - MsgBuf1; - _ when Len > Count -> - queue:in_r({on_disk, Len-Count}, MsgBuf1) - end; - _ -> MsgBuf - end. - -maybe_prefetch(State = #mqstate { prefetcher = undefined, - mode = mixed, - msg_buf = MsgBuf, - queue = Q }) -> - case queue:peek(MsgBuf) of - {value, {on_disk, Count}} -> - %% only prefetch for the next contiguous block on - %% disk. Beyond there, we either hit the end of the queue, - %% or the next msg is already in RAM, held by us, the - %% mixed queue - {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count), - State #mqstate { prefetcher = Prefetcher }; - _ -> State - end; -maybe_prefetch(State) -> - State. - -maybe_ack(_Q, true, true, AckTag) -> - AckTag; -maybe_ack(Q, _, _, AckTag) -> - ok = rabbit_disk_queue:ack(Q, [AckTag]), - not_on_disk. - -remove_diskless(MsgsWithAcks) -> - lists:foldl( - fun ({Msg, AckTag}, {AccAckTags, AccSize}) -> - Msg1 = ensure_binary_properties(Msg), - {case AckTag of - not_on_disk -> AccAckTags; - _ -> [AckTag | AccAckTags] - end, size_of_message(Msg1) + AccSize} - end, {[], 0}, MsgsWithAcks). - -on_disk(disk, _IsDurable, _IsPersistent) -> true; -on_disk(mixed, true, true) -> true; -on_disk(mixed, _IsDurable, _IsPersistent) -> false. - -publish_magic_marker_message(Q) -> - Msg = rabbit_basic:message( - rabbit_misc:r(<<"/">>, exchange, <<>>), ?MAGIC_MARKER, - [], <<>>, <<>>, true), - ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false). - -fetch_ack_magic_marker_message(Q) -> - {Msg, false, AckTag, Length} = rabbit_disk_queue:fetch(Q), - true = is_magic_marker_message(Msg), - ok = rabbit_disk_queue:ack(Q, [AckTag]), - {ok, Length}. - -is_magic_marker_message(#basic_message { routing_key = ?MAGIC_MARKER, - is_persistent = true, guid = <<>> }) -> - true; -is_magic_marker_message(_) -> - false. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c0a559e9ec..9dae268f19 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -348,7 +348,6 @@ seg_num_to_path(Dir, SegNum) -> SegName = integer_to_list(SegNum), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). - %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- |
