summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-13 16:15:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-13 16:15:29 +0100
commit4ef2ed434c2f3c2c8e83ff6670ad7347f390b562 (patch)
treeb74b961735af2276650849fb3ba9e9f32e3f5b31
parent02e5493c4de8be92e447d904b7bc41cd5475da02 (diff)
downloadrabbitmq-server-git-4ef2ed434c2f3c2c8e83ff6670ad7347f390b562.tar.gz
most of the rewiring is done. Need to sort out how to delete non durable queues on start up, which is a bit cyclical, as I'd like to not start the msg_store until we know which queues are durable and which aren't, but we also can't start the queues until the msg_store is running. Fun.
-rw-r--r--src/rabbit.erl1
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl188
-rw-r--r--src/rabbit_disk_queue.erl743
-rw-r--r--src/rabbit_mixed_queue.erl673
-rw-r--r--src/rabbit_queue_index.erl1
6 files changed, 73 insertions, 1538 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 4e027ca824..b859c4affa 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -150,6 +150,7 @@ start(normal, []) ->
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor),
ok = start_child(rabbit_guid),
+ %% TODO - this should probably use start_child somehow too
ok = rabbit_queue_index:start_msg_store()
end},
{"recovery",
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 3228655257..840c2c4d8a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,7 +42,6 @@
-export([notify_sent/2, unblock/2, tx_commit_callback/3]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--export([set_storage_mode/2]).
-import(mnesia).
-import(gen_server2).
@@ -107,7 +106,6 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(tx_commit_callback/3 :: (pid(), [message()], [acktag()]) -> 'ok').
--spec(set_storage_mode/2 :: (pid(), ('oppressed' | 'liberated')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -228,9 +226,6 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-set_storage_mode(QPid, Mode) ->
- gen_server2:pcast(QPid, 10, {set_storage_mode, Mode}).
-
info(#amqqueue{ pid = QPid }) ->
gen_server2:pcall(QPid, 9, info, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 99fd6987de..152205edaf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,7 +38,6 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
@@ -54,11 +53,10 @@
owner,
exclusive_consumer,
has_had_consumers,
- mixed_state,
+ variable_queue_state,
next_msg_id,
active_consumers,
- blocked_consumers,
- memory_report_timer
+ blocked_consumers
}).
-record(consumer, {tag, ack_required}).
@@ -88,8 +86,7 @@
acks_uncommitted,
consumers,
transactions,
- memory,
- storage_mode
+ memory
]).
%%----------------------------------------------------------------------------
@@ -99,43 +96,41 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
-init(Q = #amqqueue { name = QName, durable = Durable }) ->
+init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
ok = rabbit_memory_manager:register
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
- {ok, MS} = rabbit_mixed_queue:init(QName, Durable),
+ VQS = rabbit_variable_queue:init(QName),
State = #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- mixed_state = MS,
+ variable_queue_state = VQS,
next_msg_id = 1,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- memory_report_timer = undefined
+ blocked_consumers = queue:new()
},
- %% first thing we must do is report_memory.
- {ok, start_memory_timer(State), hibernate,
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-terminate(_Reason, State = #q{mixed_state = MS}) ->
+terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
%% FIXME: How do we cancel active subscriptions?
- State1 = stop_memory_timer(State),
%% Ensure that any persisted tx messages are removed;
%% mixed_queue:delete_queue cannot do that for us since neither
%% mixed_queue nor disk_queue keep a record of uncommitted tx
%% messages.
- {ok, MS1} = rabbit_mixed_queue:tx_rollback(
- lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), MS),
- %% Delete from disk queue first. If we crash at this point, when a
+ %% TODO: wait for all in flight tx_commits to complete
+ VQS1 = rabbit_variable_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), VQS),
+ %% Delete from disk first. If we crash at this point, when a
%% durable queue, we will be recreated at startup, possibly with
%% partial content. The alternative is much worse however - if we
%% called internal_delete first, we would then have a race between
- %% the disk_queue delete and a new queue with the same name being
+ %% the disk delete and a new queue with the same name being
%% created and published to.
- {ok, _MS} = rabbit_mixed_queue:delete_queue(MS1),
- ok = rabbit_amqqueue:internal_delete(qname(State1)).
+ _VQS = rabbit_variable_queue:delete(VQS1),
+ ok = rabbit_amqqueue:internal_delete(qname(State)).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -144,27 +139,14 @@ code_change(_OldVsn, State, _Extra) ->
reply(Reply, NewState) ->
assert_invariant(NewState),
- {reply, Reply, start_memory_timer(NewState), hibernate}.
+ {reply, Reply, NewState, hibernate}.
noreply(NewState) ->
assert_invariant(NewState),
- {noreply, start_memory_timer(NewState), hibernate}.
+ {noreply, NewState, hibernate}.
-assert_invariant(#q { active_consumers = AC, mixed_state = MS }) ->
- true = (queue:is_empty(AC) orelse rabbit_mixed_queue:is_empty(MS)).
-
-start_memory_timer(State = #q { memory_report_timer = undefined }) ->
- {ok, TRef} = timer:send_after(?MINIMUM_MEMORY_REPORT_TIME_INTERVAL,
- report_memory),
- report_memory(false, State #q { memory_report_timer = TRef });
-start_memory_timer(State) ->
- State.
-
-stop_memory_timer(State = #q { memory_report_timer = undefined }) ->
- State;
-stop_memory_timer(State = #q { memory_report_timer = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #q { memory_report_timer = undefined }.
+assert_invariant(#q { active_consumers = AC, variable_queue_state = VQS }) ->
+ true = (queue:is_empty(AC) orelse rabbit_variable_queue:is_empty(VQS)).
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -282,25 +264,24 @@ deliver_msgs_to_consumers(
deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
- State = #q { mixed_state = MS }) ->
- {{Msg, IsDelivered, AckTag, Remaining}, MS1} =
- rabbit_mixed_queue:fetch(MS),
+ State = #q { variable_queue_state = VQS }) ->
+ {{Msg, IsDelivered, AckTag, Remaining}, VQS1} =
+ rabbit_variable_queue:fetch(VQS),
AutoAcks1 = case AckRequired of
true -> AutoAcks;
false -> [AckTag | AutoAcks]
end,
{{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks1},
- State #q { mixed_state = MS1 }}.
+ State #q { variable_queue_state = VQS1 }}.
-run_message_queue(State = #q { mixed_state = MS }) ->
+run_message_queue(State = #q { variable_queue_state = VQS }) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
- IsEmpty = rabbit_mixed_queue:is_empty(MS),
+ IsEmpty = rabbit_variable_queue:is_empty(VQS),
{{_IsEmpty1, AutoAcks}, State1} =
deliver_msgs_to_consumers(Funs, {IsEmpty, []}, State),
- {ok, MS1} =
- rabbit_mixed_queue:ack(AutoAcks, State1 #q.mixed_state),
- State1 #q { mixed_state = MS1 }.
+ VQS1 = rabbit_variable_queue:ack(AutoAcks, State1 #q.variable_queue_state),
+ State1 #q { variable_queue_state = VQS1 }.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
@@ -309,10 +290,10 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
{AckTag, State2} =
case AckRequired of
true ->
- {ok, AckTag1, MS} =
- rabbit_mixed_queue:publish_delivered(
- Msg, State1 #q.mixed_state),
- {AckTag1, State1 #q { mixed_state = MS }};
+ {AckTag1, VQS} =
+ rabbit_variable_queue:publish_delivered(
+ Msg, State1 #q.variable_queue_state),
+ {AckTag1, State1 #q { variable_queue_state = VQS }};
false ->
{noack, State1}
end,
@@ -320,9 +301,9 @@ attempt_immediate_delivery(none, _ChPid, Msg, State) ->
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
attempt_immediate_delivery(Txn, ChPid, Msg, State) ->
- {ok, MS} = rabbit_mixed_queue:tx_publish(Msg, State #q.mixed_state),
+ VQS = rabbit_variable_queue:tx_publish(Msg, State #q.variable_queue_state),
record_pending_message(Txn, ChPid, Msg),
- {true, State #q { mixed_state = MS }}.
+ {true, State #q { variable_queue_state = VQS }}.
deliver_or_enqueue(Txn, ChPid, Msg, State) ->
case attempt_immediate_delivery(Txn, ChPid, Msg, State) of
@@ -330,8 +311,9 @@ deliver_or_enqueue(Txn, ChPid, Msg, State) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- {ok, MS} = rabbit_mixed_queue:publish(Msg, State #q.mixed_state),
- {false, NewState #q { mixed_state = MS }}
+ {_SeqId, VQS} = rabbit_variable_queue:publish(
+ Msg, State #q.variable_queue_state),
+ {false, NewState #q { variable_queue_state = VQS }}
end.
%% all these messages have already been delivered at least once and
@@ -344,11 +326,11 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_msgs_to_consumers(
Funs, {length(MsgsWithAcks), [], MsgsWithAcks}, State),
- {ok, MS} = rabbit_mixed_queue:ack(AutoAcks, NewState #q.mixed_state),
+ VQS = rabbit_variable_queue:ack(AutoAcks, NewState #q.variable_queue_state),
case OutstandingMsgs of
- [] -> NewState #q { mixed_state = MS };
- _ -> {ok, MS1} = rabbit_mixed_queue:requeue(OutstandingMsgs, MS),
- NewState #q { mixed_state = MS1 }
+ [] -> NewState #q { variable_queue_state = VQS };
+ _ -> VQS1 = rabbit_variable_queue:requeue(OutstandingMsgs, VQS),
+ NewState #q { variable_queue_state = VQS1 }
end.
deliver_or_requeue_msgs_pred({Len, _AcksAcc, _MsgsWithAcks}, _State) ->
@@ -504,17 +486,17 @@ commit_transaction(Txn, State) ->
store_ch_record(C#cr{unacked_messages = Remaining}),
MsgWithAcks
end,
- {ok, MS} = rabbit_mixed_queue:tx_commit(
- PendingMessagesOrdered, Acks, State #q.mixed_state),
- State #q { mixed_state = MS }.
+ VQS = rabbit_variable_queue:tx_commit(
+ PendingMessagesOrdered, Acks, State #q.variable_queue_state),
+ State #q { variable_queue_state = VQS }.
rollback_transaction(Txn, State) ->
#tx { pending_messages = PendingMessages
} = lookup_tx(Txn),
- {ok, MS} = rabbit_mixed_queue:tx_rollback(PendingMessages,
- State #q.mixed_state),
+ VQS = rabbit_variable_queue:tx_rollback(PendingMessages,
+ State #q.variable_queue_state),
erase_tx(Txn),
- State #q { mixed_state = MS }.
+ State #q { variable_queue_state = VQS }.
%% {A, B} = collect_messages(C, D) %% A = C `intersect` D; B = D \\ C
%% err, A = C `intersect` D , via projection through the dict that is C
@@ -529,12 +511,10 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
-i(storage_mode, #q{ mixed_state = MS }) ->
- rabbit_mixed_queue:storage_mode(MS);
i(pid, _) ->
self();
-i(messages_ready, #q { mixed_state = MS }) ->
- rabbit_mixed_queue:len(MS);
+i(messages_ready, #q { variable_queue_state = VQS }) ->
+ rabbit_variable_queue:len(VQS);
i(messages_unacknowledged, _) ->
lists:sum([dict:size(UAM) ||
#cr{unacked_messages = UAM} <- all_ch_record()]);
@@ -558,11 +538,6 @@ i(memory, _) ->
i(Item, _) ->
throw({bad_argument, Item}).
-report_memory(Hib, State = #q { mixed_state = MS }) ->
- {MS1, MSize} = rabbit_mixed_queue:estimate_queue_memory(MS),
- rabbit_memory_manager:report_memory(self(), MSize, Hib),
- State #q { mixed_state = MS1 }.
-
%---------------------------------------------------------------------------
handle_call(info, _From, State) ->
@@ -612,25 +587,25 @@ handle_call({notify_down, ChPid}, From, State) ->
handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
next_msg_id = NextId,
- mixed_state = MS
+ variable_queue_state = VQS
}) ->
- case rabbit_mixed_queue:fetch(MS) of
- {empty, MS1} -> reply(empty, State #q { mixed_state = MS1 });
- {{Msg, IsDelivered, AckTag, Remaining}, MS1} ->
+ case rabbit_variable_queue:fetch(VQS) of
+ {empty, VQS1} -> reply(empty, State #q { variable_queue_state = VQS1 });
+ {{Msg, IsDelivered, AckTag, Remaining}, VQS1} ->
AckRequired = not(NoAck),
- {ok, MS2} =
+ {ok, VQS2} =
case AckRequired of
true ->
C = #cr{unacked_messages = UAM} = ch_record(ChPid),
NewUAM = dict:store(NextId, {Msg, AckTag}, UAM),
store_ch_record(C#cr{unacked_messages = NewUAM}),
- {ok, MS1};
+ {ok, VQS1};
false ->
- rabbit_mixed_queue:ack([AckTag], MS1)
+ rabbit_variable_queue:ack([AckTag], VQS1)
end,
Message = {QName, self(), NextId, IsDelivered, Msg},
reply({ok, Remaining, Message},
- State #q { next_msg_id = NextId + 1, mixed_state = MS2 })
+ State #q { next_msg_id = NextId + 1, variable_queue_state = VQS2 })
end;
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
@@ -710,14 +685,14 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- mixed_state = MS,
+ variable_queue_state = VQS,
active_consumers = ActiveConsumers}) ->
- Length = rabbit_mixed_queue:len(MS),
+ Length = rabbit_variable_queue:len(VQS),
reply({ok, Name, Length, queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
- State = #q { mixed_state = MS }) ->
- Length = rabbit_mixed_queue:len(MS),
+ State = #q { variable_queue_state = VQS }) ->
+ Length = rabbit_variable_queue:len(VQS),
IsEmpty = Length == 0,
IsUnused = is_unused(State),
if
@@ -730,8 +705,8 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
end;
handle_call(purge, _From, State) ->
- {Count, MS} = rabbit_mixed_queue:purge(State #q.mixed_state),
- reply({ok, Count}, State #q { mixed_state = MS });
+ {Count, VQS} = rabbit_variable_queue:purge(State #q.variable_queue_state),
+ reply({ok, Count}, State #q { variable_queue_state = VQS });
handle_call({claim_queue, ReaderPid}, _From,
State = #q{owner = Owner, exclusive_consumer = Holder}) ->
@@ -770,11 +745,11 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case Txn of
none ->
{MsgWithAcks, Remaining} = collect_messages(MsgIds, UAM),
- {ok, MS} = rabbit_mixed_queue:ack(
- [AckTag || {_Msg, AckTag} <- MsgWithAcks],
- State #q.mixed_state),
+ VQS = rabbit_variable_queue:ack(
+ [AckTag || {_Msg, AckTag} <- MsgWithAcks],
+ State #q.variable_queue_state),
store_ch_record(C#cr{unacked_messages = Remaining}),
- noreply(State #q { mixed_state = MS });
+ noreply(State #q { variable_queue_state = VQS });
_ ->
record_pending_acks(Txn, ChPid, MsgIds),
noreply(State)
@@ -822,23 +797,7 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end));
-
-handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) ->
- PendingMessages =
- lists:flatten([Pending || #tx { pending_messages = Pending}
- <- all_tx_record()]),
- Mode1 = case Mode of
- liberated -> mixed;
- oppressed -> disk
- end,
- {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode1, PendingMessages, MS),
- noreply(State #q { mixed_state = MS1 }).
-
-handle_info(report_memory, State) ->
- %% deliberately don't call noreply/1 as we don't want to start the timer.
- %% By unsetting the timer, we force a report on the next normal message.
- {noreply, State #q { memory_report_timer = undefined }, hibernate};
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -860,9 +819,6 @@ handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-handle_pre_hibernate(State = #q { mixed_state = MS }) ->
- MS1 = rabbit_mixed_queue:maybe_prefetch(MS),
- State1 =
- stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })),
- %% don't call noreply/1 as that'll restart the memory_report_timer
- {hibernate, State1}.
+handle_pre_hibernate(State = #q { variable_queue_state = VQS }) ->
+ VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS),
+ {hibernate, State #q { variable_queue_state = VQS1 }}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
deleted file mode 100644
index 7d44dd9d54..0000000000
--- a/src/rabbit_disk_queue.erl
+++ /dev/null
@@ -1,743 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_disk_queue).
-
--behaviour(gen_server2).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([publish/3, fetch/1, phantom_fetch/1, ack/2, tx_publish/1, tx_commit/3,
- tx_rollback/1, requeue/2, purge/1, delete_queue/1,
- delete_non_durable_queues/1, requeue_next_n/2, len/1, foldl/3,
- prefetch/1
- ]).
-
--export([stop/0, stop_and_obliterate/0]).
-
-%%----------------------------------------------------------------------------
-
--include("rabbit.hrl").
-
--define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
--define(BATCH_SIZE, 10000).
-
--define(SHUTDOWN_MESSAGE_KEY, {internal_token, shutdown}).
--define(SHUTDOWN_MESSAGE,
- #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
- msg_id = infinity_and_beyond,
- is_delivered = never,
- is_persistent = true
- }).
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
--define(SERVER, ?MODULE).
-
--record(dqstate, { sequences }). %% next read and write for each q
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(msg_id() :: guid()).
--type(seq_id() :: non_neg_integer()).
--type(ack_tag() :: {msg_id(), seq_id()}).
-
--spec(start_link/0 :: () ->
- ({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/3 :: (queue_name(), message(), boolean()) -> 'ok').
--spec(fetch/1 :: (queue_name()) ->
- ('empty' |
- {message(), boolean(), ack_tag(), non_neg_integer()})).
--spec(phantom_fetch/1 :: (queue_name()) ->
- ('empty' |
- {msg_id(), boolean(), ack_tag(), non_neg_integer()})).
--spec(prefetch/1 :: (queue_name()) -> 'ok').
--spec(ack/2 :: (queue_name(), [ack_tag()]) -> 'ok').
--spec(tx_publish/1 :: (message()) -> 'ok').
--spec(tx_commit/3 :: (queue_name(), [{msg_id(), boolean(), boolean()}],
- [ack_tag()]) ->
- 'ok').
--spec(tx_rollback/1 :: ([msg_id()]) -> 'ok').
--spec(requeue/2 :: (queue_name(), [{ack_tag(), boolean()}]) -> 'ok').
--spec(requeue_next_n/2 :: (queue_name(), non_neg_integer()) -> 'ok').
--spec(purge/1 :: (queue_name()) -> non_neg_integer()).
--spec(delete_queue/1 :: (queue_name()) -> 'ok').
--spec(delete_non_durable_queues/1 :: ([queue_name()]) -> 'ok').
--spec(len/1 :: (queue_name()) -> non_neg_integer()).
--spec(foldl/3 :: (fun ((message(), ack_tag(), boolean(), A) -> A),
- A, queue_name()) -> A).
--spec(stop/0 :: () -> 'ok').
--spec(stop_and_obliterate/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-%% public API
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-publish(Q, Message = #basic_message {}, IsDelivered) ->
- gen_server2:cast(?SERVER, {publish, Q, Message, IsDelivered}).
-
-fetch(Q) ->
- gen_server2:call(?SERVER, {fetch, Q}, infinity).
-
-phantom_fetch(Q) ->
- gen_server2:call(?SERVER, {phantom_fetch, Q}, infinity).
-
-prefetch(Q) ->
- gen_server2:pcast(?SERVER, -1, {prefetch, Q, self()}).
-
-ack(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
- gen_server2:cast(?SERVER, {ack, Q, MsgSeqIds}).
-
-tx_publish(Message = #basic_message {}) ->
- gen_server2:cast(?SERVER, {tx_publish, Message}).
-
-tx_commit(Q, PubMsgIds, AckSeqIds)
- when is_list(PubMsgIds) andalso is_list(AckSeqIds) ->
- gen_server2:call(?SERVER, {tx_commit, Q, PubMsgIds, AckSeqIds}, infinity).
-
-tx_rollback(MsgIds) when is_list(MsgIds) ->
- gen_server2:cast(?SERVER, {tx_rollback, MsgIds}).
-
-requeue(Q, MsgSeqIds) when is_list(MsgSeqIds) ->
- gen_server2:cast(?SERVER, {requeue, Q, MsgSeqIds}).
-
-requeue_next_n(Q, N) when is_integer(N) ->
- gen_server2:cast(?SERVER, {requeue_next_n, Q, N}).
-
-purge(Q) ->
- gen_server2:call(?SERVER, {purge, Q}, infinity).
-
-delete_queue(Q) ->
- gen_server2:call(?SERVER, {delete_queue, Q}, infinity).
-
-delete_non_durable_queues(DurableQueues) ->
- gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues},
- infinity).
-
-len(Q) ->
- gen_server2:call(?SERVER, {len, Q}, infinity).
-
-foldl(Fun, Init, Acc) ->
- gen_server2:call(?SERVER, {foldl, Fun, Init, Acc}, infinity).
-
-stop() ->
- gen_server2:call(?SERVER, stop, infinity).
-
-stop_and_obliterate() ->
- gen_server2:call(?SERVER, stop_vaporise, infinity).
-
-%% private
-
-finalise_commit(TxDetails) ->
- gen_server2:cast(?SERVER, {finalise_commit, TxDetails}).
-
-%%----------------------------------------------------------------------------
-%% gen_server behaviour
-%%----------------------------------------------------------------------------
-
-init([]) ->
- %% If the gen_server is part of a supervision tree and is ordered
- %% by its supervisor to terminate, terminate will be called with
- %% Reason=shutdown if the following conditions apply:
- %% * the gen_server has been set to trap exit signals, and
- %% * the shutdown strategy as defined in the supervisor's
- %% child specification is an integer timeout value, not
- %% brutal_kill.
- %% Otherwise, the gen_server will be immediately terminated.
- process_flag(trap_exit, true),
-
- ok = filelib:ensure_dir(form_filename("nothing")),
-
- ok = detect_shutdown_state_and_adjust_delivered_flags(),
-
- {ok, _Pid} = rabbit_msg_store:start_link(base_directory(),
- fun msg_ref_gen/1,
- msg_ref_gen_init()),
- ok = prune(),
-
- Sequences = ets:new(?SEQUENCE_ETS_NAME, [set, private]),
- ok = extract_sequence_numbers(Sequences),
-
- State = #dqstate { sequences = Sequences },
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({fetch, Q}, _From, State) ->
- {Result, State1} = internal_fetch_body(Q, pop_queue, State),
- reply(Result, State1);
-handle_call({phantom_fetch, Q}, _From, State) ->
- Result = internal_fetch_attributes(Q, record_delivery, State),
- reply(Result, State);
-handle_call({tx_commit, Q, PubMsgIds, AckSeqIds}, From, State) ->
- State1 =
- internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State),
- noreply(State1);
-handle_call({purge, Q}, _From, State) ->
- {ok, Count, State1} = internal_purge(Q, State),
- reply(Count, State1);
-handle_call({delete_queue, Q}, From, State) ->
- gen_server2:reply(From, ok),
- {ok, State1} = internal_delete_queue(Q, State),
- noreply(State1);
-handle_call({len, Q}, _From, State = #dqstate { sequences = Sequences }) ->
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- reply(WriteSeqId - ReadSeqId, State);
-handle_call({foldl, Fun, Init, Q}, _From, State) ->
- {ok, Result, State1} = internal_foldl(Q, Fun, Init, State),
- reply(Result, State1);
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State}; %% gen_server now calls terminate
-handle_call(stop_vaporise, _From, State) ->
- State1 = shutdown(State),
- {atomic, ok} = mnesia:clear_table(rabbit_disk_queue),
- lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
- {stop, normal, ok, State1}; %% gen_server now calls terminate
-handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
- {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
- reply(ok, State1).
-
-handle_cast({publish, Q, Message, IsDelivered}, State) ->
- {ok, _MsgSeqId, State1} = internal_publish(Q, Message, IsDelivered, State),
- noreply(State1);
-handle_cast({ack, Q, MsgSeqIds}, State) ->
- {ok, State1} = internal_ack(Q, MsgSeqIds, State),
- noreply(State1);
-handle_cast({tx_publish, Message}, State) ->
- {ok, State1} = internal_tx_publish(Message, State),
- noreply(State1);
-handle_cast({tx_rollback, MsgIds}, State) ->
- {ok, State1} = internal_tx_rollback(MsgIds, State),
- noreply(State1);
-handle_cast({requeue, Q, MsgSeqIds}, State) ->
- {ok, State1} = internal_requeue(Q, MsgSeqIds, State),
- noreply(State1);
-handle_cast({requeue_next_n, Q, N}, State) ->
- {ok, State1} = internal_requeue_next_n(Q, N, State),
- noreply(State1);
-handle_cast({prefetch, Q, From}, State) ->
- {Result, State1} = internal_fetch_body(Q, peek_queue, State),
- case rabbit_misc:with_exit_handler(
- fun () -> false end,
- fun () ->
- ok = rabbit_queue_prefetcher:publish(From, Result),
- true
- end) of
- true ->
- internal_fetch_attributes(Q, ignore_delivery, State1);
- false -> ok
- end,
- noreply(State1);
-handle_cast({finalise_commit, TxDetails}, State) ->
- noreply(finalise_commit(TxDetails, State)).
-
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
-
-terminate(_Reason, State) ->
- State1 = shutdown(State),
- store_safe_shutdown(),
- State1.
-
-shutdown(State = #dqstate { sequences = undefined }) ->
- State;
-shutdown(State = #dqstate { sequences = Sequences }) ->
- ok = rabbit_msg_store:stop(),
- ets:delete(Sequences),
- State #dqstate { sequences = undefined }.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-%% general helper functions
-%%----------------------------------------------------------------------------
-
-noreply(State) ->
- {noreply, State, hibernate}.
-
-reply(Reply, State) ->
- {reply, Reply, State, hibernate}.
-
-form_filename(Name) ->
- filename:join(base_directory(), Name).
-
-base_directory() ->
- filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
-
-sequence_lookup(Sequences, Q) ->
- case ets:lookup(Sequences, Q) of
- [] -> {0, 0};
- [{_, ReadSeqId, WriteSeqId}] -> {ReadSeqId, WriteSeqId}
- end.
-
-%%----------------------------------------------------------------------------
-%% internal functions
-%%----------------------------------------------------------------------------
-
-internal_fetch_body(Q, Advance, State) ->
- case next(Q, record_delivery, Advance, State) of
- empty -> {empty, State};
- {MsgId, IsDelivered, AckTag, Remaining} ->
- {ok, Message} = rabbit_msg_store:read(MsgId),
- {{Message, IsDelivered, AckTag, Remaining}, State}
- end.
-
-internal_fetch_attributes(Q, MarkDelivered, State) ->
- next(Q, MarkDelivered, pop_queue, State).
-
-next(Q, MarkDelivered, Advance, #dqstate { sequences = Sequences }) ->
- case sequence_lookup(Sequences, Q) of
- {SeqId, SeqId} -> empty;
- {ReadSeqId, WriteSeqId} when WriteSeqId > ReadSeqId ->
- Remaining = WriteSeqId - ReadSeqId - 1,
- {MsgId, IsDelivered} =
- update_message_attributes(Q, ReadSeqId, MarkDelivered),
- ok = maybe_advance(Advance, Sequences, Q, ReadSeqId, WriteSeqId),
- AckTag = {MsgId, ReadSeqId},
- {MsgId, IsDelivered, AckTag, Remaining}
- end.
-
-update_message_attributes(Q, SeqId, MarkDelivered) ->
- [Obj =
- #dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
- mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
- ok = case {IsDelivered, MarkDelivered} of
- {true, _} -> ok;
- {false, ignore_delivery} -> ok;
- {false, record_delivery} ->
- mnesia:dirty_write(rabbit_disk_queue,
- Obj #dq_msg_loc {is_delivered = true})
- end,
- {MsgId, IsDelivered}.
-
-maybe_advance(peek_queue, _, _, _, _) ->
- ok;
-maybe_advance(pop_queue, Sequences, Q, ReadSeqId, WriteSeqId) ->
- true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
- ok.
-
-internal_foldl(Q, Fun, Init, State = #dqstate { sequences = Sequences }) ->
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- internal_foldl(Q, WriteSeqId, Fun, State, Init, ReadSeqId).
-
-internal_foldl(_Q, SeqId, _Fun, State, Acc, SeqId) ->
- {ok, Acc, State};
-internal_foldl(Q, WriteSeqId, Fun, State, Acc, ReadSeqId) ->
- [#dq_msg_loc {is_delivered = IsDelivered, msg_id = MsgId}] =
- mnesia:dirty_read(rabbit_disk_queue, {Q, ReadSeqId}),
- {ok, Message} = rabbit_msg_store:read(MsgId),
- Acc1 = Fun(Message, {MsgId, ReadSeqId}, IsDelivered, Acc),
- internal_foldl(Q, WriteSeqId, Fun, State, Acc1, ReadSeqId + 1).
-
-internal_ack(Q, MsgSeqIds, State) ->
- remove_messages(Q, MsgSeqIds, State).
-
-remove_messages(Q, MsgSeqIds, State) ->
- MsgIds = lists:foldl(
- fun ({MsgId, SeqId}, MsgIdAcc) ->
- ok = mnesia:dirty_delete(rabbit_disk_queue, {Q, SeqId}),
- [MsgId | MsgIdAcc]
- end, [], MsgSeqIds),
- ok = rabbit_msg_store:remove(MsgIds),
- {ok, State}.
-
-internal_tx_publish(Message = #basic_message { guid = MsgId,
- content = Content }, State) ->
- ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
- ok = rabbit_msg_store:write(
- MsgId, Message #basic_message { content = ClearedContent }),
- {ok, State}.
-
-internal_tx_commit(Q, PubMsgIds, AckSeqIds, From, State) ->
- TxDetails = {Q, PubMsgIds, AckSeqIds, From},
- ok = rabbit_msg_store:sync([MsgId || {MsgId, _, _} <- PubMsgIds],
- fun () -> finalise_commit(TxDetails) end),
- State.
-
-finalise_commit({Q, PubMsgIds, AckSeqIds, From},
- State = #dqstate { sequences = Sequences }) ->
- {InitReadSeqId, InitWriteSeqId} = sequence_lookup(Sequences, Q),
- WriteSeqId =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun ({MsgId, IsDelivered, IsPersistent}, SeqId) ->
- ok = mnesia:write(
- rabbit_disk_queue,
- #dq_msg_loc {
- queue_and_seq_id = {Q, SeqId},
- msg_id = MsgId,
- is_delivered = IsDelivered,
- is_persistent = IsPersistent
- }, write),
- SeqId + 1
- end, InitWriteSeqId, PubMsgIds)
- end),
- {ok, State1} = remove_messages(Q, AckSeqIds, State),
- true = case PubMsgIds of
- [] -> true;
- _ -> ets:insert(Sequences,
- {Q, InitReadSeqId, WriteSeqId})
- end,
- gen_server2:reply(From, ok),
- State1.
-
-internal_publish(Q, Message = #basic_message { guid = MsgId,
- is_persistent = IsPersistent },
- IsDelivered, State) ->
- {ok, State1 = #dqstate { sequences = Sequences }} =
- internal_tx_publish(Message, State),
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- ok = mnesia:dirty_write(rabbit_disk_queue,
- #dq_msg_loc { queue_and_seq_id = {Q, WriteSeqId},
- msg_id = MsgId,
- is_delivered = IsDelivered,
- is_persistent = IsPersistent }),
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId + 1}),
- {ok, {MsgId, WriteSeqId}, State1}.
-
-internal_tx_rollback(MsgIds, State) ->
- ok = rabbit_msg_store:remove(MsgIds),
- {ok, State}.
-
-internal_requeue(_Q, [], State) ->
- {ok, State};
-internal_requeue(Q, MsgSeqIds, State = #dqstate { sequences = Sequences }) ->
- %% We know that every seq_id in here is less than the ReadSeqId
- %% you'll get if you look up this queue in Sequences (i.e. they've
- %% already been delivered). We also know that the rows for these
- %% messages are still in rabbit_disk_queue (i.e. they've not been
- %% ack'd).
- %%
- %% Now, it would be nice if we could adjust the sequence ids in
- %% rabbit_disk_queue (mnesia) to create a contiguous block and
- %% then drop the ReadSeqId for the queue by the corresponding
- %% amount. However, this is not safe because there may be other
- %% sequence ids which have been sent out as part of deliveries
- %% which are not being requeued. As such, moving things about in
- %% rabbit_disk_queue _under_ the current ReadSeqId would result in
- %% such sequence ids referring to the wrong messages.
- %%
- %% Therefore, the only solution is to take these messages, and to
- %% reenqueue them at the top of the queue. Usefully, this only
- %% affects the Sequences and rabbit_disk_queue structures - there
- %% is no need to physically move the messages about on disk, so
- %% the message store remains unaffected, except we need to tell it
- %% about the ids of the requeued messages so it can remove them
- %% from its message cache if necessary.
-
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- {WriteSeqId1, Q, MsgIds} =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(fun requeue_message/2, {WriteSeqId, Q, []},
- MsgSeqIds)
- end),
- true = ets:insert(Sequences, {Q, ReadSeqId, WriteSeqId1}),
- ok = rabbit_msg_store:release(MsgIds),
- {ok, State}.
-
-requeue_message({{MsgId, SeqId}, IsDelivered}, {WriteSeqId, Q, Acc}) ->
- [Obj = #dq_msg_loc { is_delivered = true, msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, SeqId}, write),
- ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeqId},
- is_delivered = IsDelivered
- },
- write),
- ok = mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write),
- {WriteSeqId + 1, Q, [MsgId | Acc]}.
-
-%% move the next N messages from the front of the queue to the back.
-internal_requeue_next_n(Q, N, State = #dqstate { sequences = Sequences }) ->
- {ReadSeqId, WriteSeqId} = sequence_lookup(Sequences, Q),
- if N >= (WriteSeqId - ReadSeqId) -> {ok, State};
- true ->
- {ReadSeqIdN, WriteSeqIdN, MsgIds} =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- requeue_next_messages(Q, N, ReadSeqId, WriteSeqId, [])
- end
- ),
- true = ets:insert(Sequences, {Q, ReadSeqIdN, WriteSeqIdN}),
- ok = rabbit_msg_store:release(MsgIds),
- {ok, State}
- end.
-
-requeue_next_messages(_Q, 0, ReadSeq, WriteSeq, Acc) ->
- {ReadSeq, WriteSeq, Acc};
-requeue_next_messages(Q, N, ReadSeq, WriteSeq, Acc) ->
- [Obj = #dq_msg_loc { msg_id = MsgId }] =
- mnesia:read(rabbit_disk_queue, {Q, ReadSeq}, write),
- ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc {queue_and_seq_id = {Q, WriteSeq}},
- write),
- ok = mnesia:delete(rabbit_disk_queue, {Q, ReadSeq}, write),
- requeue_next_messages(Q, N - 1, ReadSeq + 1, WriteSeq + 1, [MsgId | Acc]).
-
-internal_purge(Q, State = #dqstate { sequences = Sequences }) ->
- case sequence_lookup(Sequences, Q) of
- {SeqId, SeqId} -> {ok, 0, State};
- {ReadSeqId, WriteSeqId} ->
- {MsgSeqIds, WriteSeqId} =
- rabbit_misc:unfold(
- fun (SeqId) when SeqId == WriteSeqId -> false;
- (SeqId) ->
- [#dq_msg_loc { msg_id = MsgId }] =
- mnesia:dirty_read(rabbit_disk_queue, {Q, SeqId}),
- {true, {MsgId, SeqId}, SeqId + 1}
- end, ReadSeqId),
- true = ets:insert(Sequences, {Q, WriteSeqId, WriteSeqId}),
- {ok, State1} = remove_messages(Q, MsgSeqIds, State),
- {ok, WriteSeqId - ReadSeqId, State1}
- end.
-
-internal_delete_queue(Q, State) ->
- %% remove everything undelivered
- {ok, _Count, State1 = #dqstate { sequences = Sequences }} =
- internal_purge(Q, State),
- true = ets:delete(Sequences, Q),
- %% remove everything already delivered
- remove_messages(
- Q, [{MsgId, SeqId} || #dq_msg_loc { queue_and_seq_id = {_Q, SeqId},
- msg_id = MsgId } <-
- mnesia:dirty_match_object(
- rabbit_disk_queue,
- #dq_msg_loc {
- queue_and_seq_id = {Q, '_'},
- _ = '_' })], State1).
-
-internal_delete_non_durable_queues(
- DurableQueues, State = #dqstate { sequences = Sequences }) ->
- DurableQueueSet = sets:from_list(DurableQueues),
- ets:foldl(
- fun ({Q, _Read, _Write}, {ok, State1}) ->
- case sets:is_element(Q, DurableQueueSet) of
- true -> {ok, State1};
- false -> internal_delete_queue(Q, State1)
- end
- end, {ok, State}, Sequences).
-
-%%----------------------------------------------------------------------------
-%% recovery
-%%----------------------------------------------------------------------------
-
-store_safe_shutdown() ->
- ok = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- mnesia:write(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE, write)
- end).
-
-detect_shutdown_state_and_adjust_delivered_flags() ->
- MarkDelivered =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- case mnesia:read(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE_KEY, read) of
- [?SHUTDOWN_MESSAGE] ->
- mnesia:delete(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE_KEY, write),
- false;
- [] ->
- true
- end
- end),
- %% if we crash here, then on startup we'll not find the
- %% SHUTDOWN_MESSAGE so will mark everything delivered, which is
- %% the safe thing to do.
- case MarkDelivered of
- true -> mark_messages_delivered();
- false -> ok
- end.
-
-mark_messages_delivered() ->
- mark_message_delivered('$start_of_table').
-
-%% A single huge transaction is a bad idea because of memory
-%% use. Equally, using dirty operations is a bad idea because you
-%% shouldn't do writes when doing mnesia:dirty_next, because the
-%% ordering can change. So we use transactions of bounded
-%% size. However, even this does necessitate restarting between
-%% transactions.
-mark_message_delivered('$end_of_table') ->
- ok;
-mark_message_delivered(_Key) ->
- mark_message_delivered(
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- mark_message_delivered(mnesia:first(rabbit_disk_queue),
- ?BATCH_SIZE)
- end)).
-
-mark_message_delivered(Key, 0) ->
- Key;
-mark_message_delivered(Key = '$end_of_table', _N) ->
- Key;
-mark_message_delivered(Key, N) ->
- [Obj] = mnesia:read(rabbit_disk_queue, Key, write),
- M = case Obj #dq_msg_loc.is_delivered of
- true -> N;
- false ->
- ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { is_delivered = true },
- write),
- N - 1
- end,
- mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
-
-msg_ref_gen_init() -> mnesia:dirty_first(rabbit_disk_queue).
-
-msg_ref_gen('$end_of_table') -> finished;
-msg_ref_gen(Key) ->
- [#dq_msg_loc { msg_id = MsgId, is_persistent = IsPersistent }] =
- mnesia:dirty_read(rabbit_disk_queue, Key),
- NextKey = mnesia:dirty_next(rabbit_disk_queue, Key),
- {MsgId, case IsPersistent of true -> 1; false -> 0 end, NextKey}.
-
-prune_flush_batch(DeleteAcc) ->
- lists:foldl(fun (Key, ok) ->
- mnesia:dirty_delete(rabbit_disk_queue, Key)
- end, ok, DeleteAcc).
-
-prune() ->
- prune(mnesia:dirty_first(rabbit_disk_queue), [], 0).
-
-prune('$end_of_table', DeleteAcc, _Len) ->
- prune_flush_batch(DeleteAcc);
-prune(Key, DeleteAcc, Len) ->
- [#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }] =
- mnesia:dirty_read(rabbit_disk_queue, Key),
- {DeleteAcc1, Len1} =
- case rabbit_msg_store:contains(MsgId) of
- true -> {DeleteAcc, Len};
- false -> {[{Q, SeqId} | DeleteAcc], Len + 1}
- end,
- if Len1 >= ?BATCH_SIZE ->
- %% We have no way of knowing how flushing the batch will
- %% affect ordering of records within the table, so have no
- %% choice but to start again. Although this will make
- %% recovery slower for large queues, we guarantee we can
- %% start up in constant memory
- ok = prune_flush_batch(DeleteAcc1),
- NextKey = mnesia:dirty_first(rabbit_disk_queue),
- prune(NextKey, [], 0);
- true ->
- NextKey = mnesia:dirty_next(rabbit_disk_queue, Key),
- prune(NextKey, DeleteAcc1, Len1)
- end.
-
-extract_sequence_numbers(Sequences) ->
- true =
- rabbit_misc:execute_mnesia_transaction(
- %% the ets manipulation within this transaction is
- %% idempotent, in particular we're only reading from mnesia,
- %% and combining what we read with what we find in
- %% ets. Should the transaction restart, the non-rolledback
- %% data in ets can still be successfully combined with what
- %% we find in mnesia
- fun() ->
- ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(
- fun (#dq_msg_loc { queue_and_seq_id = {Q, SeqId} }, true) ->
- NextWrite = SeqId + 1,
- case ets:lookup(Sequences, Q) of
- [] -> ets:insert_new(Sequences,
- {Q, SeqId, NextWrite});
- [Orig = {_, Read, Write}] ->
- Repl = {Q, lists:min([Read, SeqId]),
- lists:max([Write, NextWrite])},
- case Orig == Repl of
- true -> true;
- false -> ets:insert(Sequences, Repl)
- end
- end
- end, true, rabbit_disk_queue)
- end),
- ok = remove_gaps_in_sequences(Sequences).
-
-remove_gaps_in_sequences(Sequences) ->
- %% read the comments at internal_requeue.
-
- %% Because we are at startup, we know that no sequence ids have
- %% been issued (or at least, they were, but have been
- %% forgotten). Therefore, we can nicely shuffle up and not
- %% worry. Note that I'm choosing to shuffle up, but alternatively
- %% we could shuffle downwards. However, I think there's greater
- %% likelihood of gaps being at the bottom rather than the top of
- %% the queue, so shuffling up should be the better bet.
- QueueBoundaries =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- lists:foldl(
- fun ({Q, ReadSeqId, WriteSeqId}, Acc) ->
- Gap = shuffle_up(Q, ReadSeqId-1, WriteSeqId-1, 0),
- [{Q, ReadSeqId + Gap, WriteSeqId} | Acc]
- end, [], ets:match_object(Sequences, '_'))
- end),
- true = lists:foldl(fun (Obj, true) -> ets:insert(Sequences, Obj) end,
- true, QueueBoundaries),
- ok.
-
-shuffle_up(_Q, SeqId, SeqId, Gap) ->
- Gap;
-shuffle_up(Q, BaseSeqId, SeqId, Gap) ->
- GapInc =
- case mnesia:read(rabbit_disk_queue, {Q, SeqId}, write) of
- [] -> 1;
- [Obj] ->
- case Gap of
- 0 -> ok;
- _ -> mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc {
- queue_and_seq_id = {Q, SeqId + Gap }},
- write),
- mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write)
- end,
- 0
- end,
- shuffle_up(Q, BaseSeqId, SeqId - 1, Gap + GapInc).
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
deleted file mode 100644
index c278bac86d..0000000000
--- a/src/rabbit_mixed_queue.erl
+++ /dev/null
@@ -1,673 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_mixed_queue).
-
--include("rabbit.hrl").
-
--export([init/2]).
-
--export([publish/2, publish_delivered/2, fetch/1, ack/2,
- tx_publish/2, tx_commit/3, tx_rollback/2, requeue/2, purge/1,
- len/1, is_empty/1, delete_queue/1, maybe_prefetch/1]).
-
--export([set_storage_mode/3, storage_mode/1,
- estimate_queue_memory/1]).
-
--record(mqstate, { mode,
- msg_buf,
- queue,
- is_durable,
- length,
- memory_size,
- prefetcher
- }
- ).
-
--define(TO_DISK_MAX_FLUSH_SIZE, 100000).
--define(MAGIC_MARKER, <<"$magic_marker">>).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(mode() :: ( 'disk' | 'mixed' )).
--type(mqstate() :: #mqstate { mode :: mode(),
- msg_buf :: queue(),
- queue :: queue_name(),
- is_durable :: boolean(),
- length :: non_neg_integer(),
- memory_size :: (non_neg_integer() | 'undefined'),
- prefetcher :: (pid() | 'undefined')
- }).
--type(msg_id() :: guid()).
--type(seq_id() :: non_neg_integer()).
--type(ack_tag() :: ( 'not_on_disk' | {msg_id(), seq_id()} )).
--type(okmqs() :: {'ok', mqstate()}).
-
--spec(init/2 :: (queue_name(), boolean()) -> okmqs()).
--spec(publish/2 :: (message(), mqstate()) -> okmqs()).
--spec(publish_delivered/2 :: (message(), mqstate()) ->
- {'ok', ack_tag(), mqstate()}).
--spec(fetch/1 :: (mqstate()) ->
- {('empty' | {message(), boolean(), ack_tag(), non_neg_integer()}),
- mqstate()}).
--spec(ack/2 :: ([{message(), ack_tag()}], mqstate()) -> okmqs()).
--spec(tx_publish/2 :: (message(), mqstate()) -> okmqs()).
--spec(tx_commit/3 :: ([message()], [ack_tag()], mqstate()) -> okmqs()).
--spec(tx_rollback/2 :: ([message()], mqstate()) -> okmqs()).
--spec(requeue/2 :: ([{message(), ack_tag()}], mqstate()) -> okmqs()).
--spec(purge/1 :: (mqstate()) -> okmqs()).
--spec(delete_queue/1 :: (mqstate()) -> {'ok', mqstate()}).
--spec(len/1 :: (mqstate()) -> non_neg_integer()).
--spec(is_empty/1 :: (mqstate()) -> boolean()).
-
--spec(set_storage_mode/3 :: (mode(), [message()], mqstate()) -> okmqs()).
--spec(estimate_queue_memory/1 :: (mqstate()) ->
- {mqstate(), non_neg_integer()}).
--spec(storage_mode/1 :: (mqstate()) -> mode()).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-init(Queue, IsDurable) ->
- Len = rabbit_disk_queue:len(Queue),
- {Size, MarkerFound, MarkerPreludeCount} =
- rabbit_disk_queue:foldl(
- fun (Msg = #basic_message { is_persistent = true },
- _AckTag, _IsDelivered, {SizeAcc, MFound, MPCount}) ->
- SizeAcc1 = SizeAcc + size_of_message(Msg),
- case {MFound, is_magic_marker_message(Msg)} of
- {false, false} -> {SizeAcc1, false, MPCount + 1};
- {false, true} -> {SizeAcc, true, MPCount};
- {true, false} -> {SizeAcc1, true, MPCount}
- end
- end, {0, false, 0}, Queue),
- Len1 = case MarkerFound of
- false -> Len;
- true ->
- ok = rabbit_disk_queue:requeue_next_n(Queue,
- MarkerPreludeCount),
- Len2 = Len - 1,
- {ok, Len2} = fetch_ack_magic_marker_message(Queue),
- Len2
- end,
- MsgBuf = inc_queue_length(queue:new(), Len1),
- {ok, #mqstate { mode = disk, msg_buf = MsgBuf, queue = Queue,
- is_durable = IsDurable, length = Len1,
- memory_size = Size, prefetcher = undefined }}.
-
-publish(Msg = #basic_message { is_persistent = IsPersistent }, State =
- #mqstate { queue = Q, mode = Mode, is_durable = IsDurable,
- msg_buf = MsgBuf, length = Length }) ->
- Msg1 = ensure_binary_properties(Msg),
- ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:publish(Q, Msg1, false);
- false -> ok
- end,
- MsgBuf1 = case Mode of
- disk -> inc_queue_length(MsgBuf, 1);
- mixed -> queue:in({Msg1, false}, MsgBuf)
- end,
- {ok, gain_memory(size_of_message(Msg1),
- State #mqstate { msg_buf = MsgBuf1,
- length = Length + 1 })}.
-
-%% Assumption here is that the queue is empty already (only called via
-%% attempt_immediate_delivery).
-publish_delivered(Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent},
- State = #mqstate { is_durable = IsDurable, queue = Q,
- length = 0 })
- when IsDurable andalso IsPersistent ->
- Msg1 = ensure_binary_properties(Msg),
- ok = rabbit_disk_queue:publish(Q, Msg1, true),
- State1 = gain_memory(size_of_message(Msg1), State),
- %% must call phantom_fetch otherwise the msg remains at the head
- %% of the queue. This is synchronous, but unavoidable as we need
- %% the AckTag
- {MsgId, true, AckTag, 0} = rabbit_disk_queue:phantom_fetch(Q),
- {ok, AckTag, State1};
-publish_delivered(Msg, State = #mqstate { length = 0 }) ->
- Msg1 = ensure_binary_properties(Msg),
- {ok, not_on_disk, gain_memory(size_of_message(Msg1), State)}.
-
-fetch(State = #mqstate { length = 0 }) ->
- {empty, State};
-fetch(State = #mqstate { msg_buf = MsgBuf, queue = Q,
- is_durable = IsDurable, length = Length,
- prefetcher = Prefetcher }) ->
- {{value, Value}, MsgBuf1} = queue:out(MsgBuf),
- Rem = Length - 1,
- State1 = State #mqstate { length = Rem },
- case Value of
- {Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
- IsDelivered} ->
- AckTag =
- case IsDurable andalso IsPersistent of
- true ->
- {MsgId, IsDelivered, AckTag1, _PRem}
- = rabbit_disk_queue:phantom_fetch(Q),
- AckTag1;
- false ->
- not_on_disk
- end,
- {{Msg, IsDelivered, AckTag, Rem},
- State1 #mqstate { msg_buf = MsgBuf1 }};
- {Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTag} ->
- %% message has come via the prefetcher, thus it's been
- %% marked delivered. If it's not persistent+durable, we
- %% should ack it now
- AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
- {{Msg, IsDelivered, AckTag1, Rem},
- State1 #mqstate { msg_buf = MsgBuf1 }};
- _ when Prefetcher == undefined ->
- MsgBuf2 = dec_queue_length(MsgBuf, 1),
- {Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTag, _PersistRem}
- = rabbit_disk_queue:fetch(Q),
- AckTag1 = maybe_ack(Q, IsDurable, IsPersistent, AckTag),
- {{Msg, IsDelivered, AckTag1, Rem},
- State1 #mqstate { msg_buf = MsgBuf2 }};
- _ ->
- %% use State, not State1 as we've not dec'd length
- fetch(case rabbit_queue_prefetcher:drain(Prefetcher) of
- empty -> State #mqstate { prefetcher = undefined };
- {Fetched, Status} ->
- MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)),
- State #mqstate
- { msg_buf = queue:join(Fetched, MsgBuf2),
- prefetcher = case Status of
- finished -> undefined;
- continuing -> Prefetcher
- end }
- end)
- end.
-
-ack(MsgsWithAcks, State = #mqstate { queue = Q }) ->
- {AckTags, ASize} = remove_diskless(MsgsWithAcks),
- ok = case AckTags of
- [] -> ok;
- _ -> rabbit_disk_queue:ack(Q, AckTags)
- end,
- {ok, lose_memory(ASize, State)}.
-
-tx_publish(Msg = #basic_message { is_persistent = IsPersistent },
- State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
- Msg1 = ensure_binary_properties(Msg),
- ok = case on_disk(Mode, IsDurable, IsPersistent) of
- true -> rabbit_disk_queue:tx_publish(Msg1);
- false -> ok
- end,
- {ok, gain_memory(size_of_message(Msg1), State)}.
-
-tx_commit(Publishes, MsgsWithAcks,
- State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable, length = Length }) ->
- PersistentPubs =
- [{MsgId, false, IsPersistent} ||
- #basic_message { guid = MsgId,
- is_persistent = IsPersistent } <- Publishes,
- on_disk(Mode, IsDurable, IsPersistent)],
- {RealAcks, ASize} = remove_diskless(MsgsWithAcks),
- ok = case {PersistentPubs, RealAcks} of
- {[], []} -> ok;
- _ -> rabbit_disk_queue:tx_commit(
- Q, PersistentPubs, RealAcks)
- end,
- Len = length(Publishes),
- MsgBuf1 = case Mode of
- disk -> inc_queue_length(MsgBuf, Len);
- mixed -> ToAdd = [{Msg, false} || Msg <- Publishes],
- queue:join(MsgBuf, queue:from_list(ToAdd))
- end,
- {ok, lose_memory(ASize, State #mqstate { msg_buf = MsgBuf1,
- length = Length + Len })}.
-
-tx_rollback(Publishes,
- State = #mqstate { mode = Mode, is_durable = IsDurable }) ->
- {PersistentPubs, CSize} =
- lists:foldl(
- fun (Msg = #basic_message { is_persistent = IsPersistent,
- guid = MsgId }, {Acc, CSizeAcc}) ->
- Msg1 = ensure_binary_properties(Msg),
- CSizeAcc1 = CSizeAcc + size_of_message(Msg1),
- {case on_disk(Mode, IsDurable, IsPersistent) of
- true -> [MsgId | Acc];
- _ -> Acc
- end, CSizeAcc1}
- end, {[], 0}, Publishes),
- ok = case PersistentPubs of
- [] -> ok;
- _ -> rabbit_disk_queue:tx_rollback(PersistentPubs)
- end,
- {ok, lose_memory(CSize, State)}.
-
-%% [{Msg, AckTag}]
-requeue(MsgsWithAckTags,
- State = #mqstate { mode = Mode, queue = Q, msg_buf = MsgBuf,
- is_durable = IsDurable, length = Length }) ->
- RQ = lists:foldl(
- fun ({Msg = #basic_message { is_persistent = IsPersistent }, AckTag},
- RQAcc) ->
- case IsDurable andalso IsPersistent of
- true ->
- [{AckTag, true} | RQAcc];
- false ->
- case Mode of
- mixed ->
- RQAcc;
- disk when not_on_disk =:= AckTag ->
- ok = case RQAcc of
- [] -> ok;
- _ -> rabbit_disk_queue:requeue
- (Q, lists:reverse(RQAcc))
- end,
- ok = rabbit_disk_queue:publish(Q, Msg, true),
- []
- end
- end
- end, [], MsgsWithAckTags),
- ok = case RQ of
- [] -> ok;
- _ -> rabbit_disk_queue:requeue(Q, lists:reverse(RQ))
- end,
- Len = length(MsgsWithAckTags),
- MsgBuf1 = case Mode of
- mixed -> ToAdd = [{Msg, true} || {Msg, _} <- MsgsWithAckTags],
- queue:join(MsgBuf, queue:from_list(ToAdd));
- disk -> inc_queue_length(MsgBuf, Len)
- end,
- {ok, State #mqstate { msg_buf = MsgBuf1, length = Length + Len }}.
-
-purge(State = #mqstate { queue = Q, mode = Mode, length = Count,
- prefetcher = Prefetcher, memory_size = QSize }) ->
- PurgedFromDisk = rabbit_disk_queue:purge(Q),
- Count = case Mode of
- disk ->
- PurgedFromDisk;
- mixed ->
- ok = case Prefetcher of
- undefined -> ok;
- _ -> rabbit_queue_prefetcher:stop(Prefetcher)
- end,
- Count
- end,
- {Count, lose_memory(QSize, State #mqstate { msg_buf = queue:new(),
- length = 0,
- prefetcher = undefined })}.
-
-delete_queue(State = #mqstate { queue = Q, memory_size = QSize,
- prefetcher = Prefetcher
- }) ->
- ok = case Prefetcher of
- undefined -> ok;
- _ -> rabbit_queue_prefetcher:stop(Prefetcher)
- end,
- ok = rabbit_disk_queue:delete_queue(Q),
- {ok, lose_memory(QSize, State #mqstate { length = 0, msg_buf = queue:new(),
- prefetcher = undefined })}.
-
-len(#mqstate { length = Length }) ->
- Length.
-
-is_empty(#mqstate { length = Length }) ->
- 0 == Length.
-
-%%----------------------------------------------------------------------------
-%% storage mode management
-%%----------------------------------------------------------------------------
-
-set_storage_mode(Mode, _TxnMessages, State = #mqstate { mode = Mode }) ->
- {ok, State};
-set_storage_mode(disk, TxnMessages, State =
- #mqstate { mode = mixed, queue = Q, msg_buf = MsgBuf, length = Length,
- is_durable = IsDurable, prefetcher = Prefetcher }) ->
- State1 = State #mqstate { mode = disk },
- MsgBuf1 =
- case Prefetcher of
- undefined -> MsgBuf;
- _ ->
- case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of
- empty -> MsgBuf;
- Fetched ->
- MsgBuf2 = dec_queue_length(MsgBuf, queue:len(Fetched)),
- queue:join(Fetched, MsgBuf2)
- end
- end,
- {ok, MsgBuf3} =
- send_messages_to_disk(IsDurable, Q, MsgBuf1, Length),
- %% tx_publish txn messages. Some of these will have been already
- %% published if they really are durable and persistent which is
- %% why we can't just use our own tx_publish/2 function (would end
- %% up publishing twice, so refcount would go wrong in disk_queue).
- %% The order of msgs within a txn is determined only at tx_commit
- %% time, so it doesn't matter if we're publishing msgs to the disk
- %% queue in a different order from that which we received them in.
- lists:foreach(
- fun (Msg = #basic_message { is_persistent = IsPersistent }) ->
- ok = case IsDurable andalso IsPersistent of
- true -> ok;
- _ -> rabbit_disk_queue:tx_publish(Msg)
- end
- end, TxnMessages),
- garbage_collect(),
- {ok, State1 #mqstate { msg_buf = MsgBuf3, prefetcher = undefined }};
-set_storage_mode(mixed, TxnMessages, State =
- #mqstate { mode = disk, is_durable = IsDurable }) ->
- %% The queue has a token just saying how many msgs are on disk
- %% (this is already built for us when in disk mode).
- %% Don't actually do anything to the disk
- %% Don't start prefetcher just yet because the queue maybe busy -
- %% wait for hibernate timeout in the amqqueue_process.
-
- %% Remove txn messages from disk which are not (persistent and
- %% durable). This is necessary to avoid leaks. This is also pretty
- %% much the inverse behaviour of our own tx_rollback/2 which is
- %% why we're not using that.
- Cancel = [ MsgId || #basic_message { is_persistent = IsPersistent,
- guid = MsgId } <- TxnMessages,
- not (IsDurable andalso IsPersistent) ],
- ok = case Cancel of
- [] -> ok;
- _ -> rabbit_disk_queue:tx_rollback(Cancel)
- end,
- garbage_collect(),
- {ok, State #mqstate { mode = mixed }}.
-
-send_messages_to_disk(_IsDurable, _Q, MsgBuf, 0) ->
- {ok, MsgBuf};
-send_messages_to_disk(IsDurable, Q, MsgBuf, Length) ->
- case scan_for_disk_after_ram(IsDurable, MsgBuf) of
- disk_only ->
- %% Everything on disk already, we don't need to do
- %% anything
- {ok, inc_queue_length(queue:new(), Length)};
- {not_found, PrefixLen, MsgBufRAMSuffix} ->
- %% No disk msgs follow RAM msgs and the queue has a RAM
- %% suffix, so we can just publish those. If we crash at
- %% this point, we may lose some messages, but everything
- %% will remain in the right order, so no need for the
- %% marker messages.
- MsgBuf1 = inc_queue_length(queue:new(), PrefixLen),
- send_messages_to_disk(IsDurable, Q, MsgBufRAMSuffix, 0, 0, [], [],
- MsgBuf1);
- found ->
- %% There are disk msgs *after* ram msgs in the queue. We
- %% need to reenqueue everything. Note that due to batching
- %% going on (see comments above send_messages_to_disk/8),
- %% if we crash during this transition, we could have
- %% messages in the wrong order on disk. Thus we publish a
- %% magic_marker_message which, when this transition is
- %% complete, will be back at the head of the queue. Should
- %% we die, on startup, during the foldl over the queue, we
- %% detect the marker message and requeue all the messages
- %% in front of it, to the back of the queue, thus
- %% correcting the order. The result is that everything
- %% ends up back in the same order, but will have new
- %% sequence IDs.
- ok = publish_magic_marker_message(Q),
- {ok, MsgBuf1} =
- send_messages_to_disk(IsDurable, Q, MsgBuf, 0, 0, [], [],
- queue:new()),
- {ok, Length} = fetch_ack_magic_marker_message(Q),
- {ok, MsgBuf1}
- end.
-
-scan_for_disk_after_ram(IsDurable, MsgBuf) ->
- scan_for_disk_after_ram(IsDurable, MsgBuf, {disk, 0}).
-
-%% We return 'disk_only' if everything is alread on disk; 'found' if
-%% we find a disk message after finding RAM messages; and
-%% {'not_found', Count, MsgBuf} otherwise, where Count is the length
-%% of the disk prefix, and MsgBuf is the RAM suffix of the MsgBuf
-%% argument. Note msgs via the prefetcher are counted as RAM msgs on
-%% the grounds that they have to be republished.
-scan_for_disk_after_ram(IsDurable, MsgBuf, Mode) ->
- case queue:out(MsgBuf) of
- {empty, _MsgBuf} ->
- case Mode of
- {ram, N, MsgBuf1} -> {not_found, N, MsgBuf1};
- {disk, _N} -> disk_only
- end;
- {{value, {on_disk, Count}}, MsgBuf1} ->
- case Mode of
- {ram, _, _} -> found; %% found disk after RAM, bad
- {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
- {disk, N + Count})
- end;
- {{value, {_Msg, _IsDelivered, _AckTag}}, MsgBuf1} ->
- %% found a msg from the prefetcher. Ensure RAM mode
- scan_for_disk_after_ram(IsDurable, MsgBuf1,
- ensure_ram(Mode, MsgBuf));
- {{value,
- {#basic_message { is_persistent = IsPersistent }, _IsDelivered}},
- MsgBuf1} ->
- %% normal message
- case IsDurable andalso IsPersistent of
- true ->
- case Mode of
- {ram, _, _} -> found; %% found disk after RAM, bad
- {disk, N} -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
- {disk, N + 1})
- end;
- false -> scan_for_disk_after_ram(IsDurable, MsgBuf1,
- ensure_ram(Mode, MsgBuf))
- end
- end.
-
-ensure_ram(Obj = {ram, _N, _MsgBuf}, _MsgBuf1) -> Obj;
-ensure_ram({disk, N}, MsgBuf) -> {ram, N, MsgBuf}.
-
-%% (Re)enqueue _everything_ here. Messages which are not on disk will
-%% be tx_published, messages that are on disk will be requeued to the
-%% end of the queue. This is done in batches, where a batch consists
-%% of a number a tx_publishes, a tx_commit and then a call to
-%% requeue_next_n. We do not want to fetch messages off disk only to
-%% republish them later. Note in the tx_commit, we ack messages which
-%% are being _re_published. These are messages that have been fetched
-%% by the prefetcher.
-%% Batches are limited in size to make sure that the resultant mnesia
-%% transaction on tx_commit does not get too big, memory wise.
-send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
- Commit, Ack, MsgBuf) ->
- case queue:out(Queue) of
- {empty, _Queue} ->
- ok = flush_messages_to_disk_queue(Q, Commit, Ack),
- {[], []} = flush_requeue_to_disk_queue(Q, RequeueCount, [], []),
- {ok, MsgBuf};
- {{value, {Msg = #basic_message { is_persistent = IsPersistent },
- IsDelivered}}, Queue1} ->
- case IsDurable andalso IsPersistent of
- true -> %% it's already in the Q
- send_messages_to_disk(
- IsDurable, Q, Queue1, PublishCount, RequeueCount + 1,
- Commit, Ack, inc_queue_length(MsgBuf, 1));
- false ->
- republish_message_to_disk_queue(
- IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- Ack, MsgBuf, Msg, IsDelivered)
- end;
- {{value, {Msg, IsDelivered, AckTag}}, Queue1} ->
- %% These have come via the prefetcher, so are no longer in
- %% the disk queue (yes, they've not been ack'd yet, but
- %% the head of the queue has passed these messages). We
- %% need to requeue them, which we sneakily achieve by
- %% tx_publishing them, and then in the tx_commit, ack the
- %% old copy.
- republish_message_to_disk_queue(
- IsDurable, Q, Queue1, PublishCount, RequeueCount, Commit,
- [AckTag | Ack], MsgBuf, Msg, IsDelivered);
- {{value, {on_disk, Count}}, Queue1} ->
- send_messages_to_disk(
- IsDurable, Q, Queue1, PublishCount, RequeueCount + Count,
- Commit, Ack, inc_queue_length(MsgBuf, Count))
- end.
-
-republish_message_to_disk_queue(
- IsDurable, Q, Queue, PublishCount, RequeueCount, Commit, Ack, MsgBuf,
- Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent },
- IsDelivered) ->
- {Commit1, Ack1} = flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack),
- ok = rabbit_disk_queue:tx_publish(Msg),
- Commit2 = [{MsgId, IsDelivered, IsPersistent} | Commit1],
- {PublishCount1, Commit3, Ack2} =
- case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
- true -> ok = flush_messages_to_disk_queue(Q, Commit2, Ack1),
- {0, [], []};
- false -> {PublishCount + 1, Commit2, Ack1}
- end,
- send_messages_to_disk(IsDurable, Q, Queue, PublishCount1, 0,
- Commit3, Ack2, inc_queue_length(MsgBuf, 1)).
-
-flush_messages_to_disk_queue(_Q, [], []) ->
- ok;
-flush_messages_to_disk_queue(Q, Commit, Ack) ->
- rabbit_disk_queue:tx_commit(Q, lists:reverse(Commit), Ack).
-
-flush_requeue_to_disk_queue(_Q, 0, Commit, Ack) ->
- {Commit, Ack};
-flush_requeue_to_disk_queue(Q, RequeueCount, Commit, Ack) ->
- ok = flush_messages_to_disk_queue(Q, Commit, Ack),
- ok = rabbit_disk_queue:requeue_next_n(Q, RequeueCount),
- {[], []}.
-
-%% Scaling this by 4 is a magic number. Found by trial and error to
-%% work ok. We are deliberately over reporting so that we run out of
-%% memory sooner rather than later, because the transition to disk
-%% only modes transiently can take quite a lot of memory.
-estimate_queue_memory(State = #mqstate { memory_size = Size }) ->
- {State, 4 * Size}.
-
-storage_mode(#mqstate { mode = Mode }) ->
- Mode.
-
-%%----------------------------------------------------------------------------
-%% helpers
-%%----------------------------------------------------------------------------
-
-size_of_message(
- #basic_message { content = #content { payload_fragments_rev = Payload,
- properties_bin = PropsBin }})
- when is_binary(PropsBin) ->
- size(PropsBin) + lists:foldl(fun (Frag, SumAcc) ->
- SumAcc + size(Frag)
- end, 0, Payload).
-
-ensure_binary_properties(Msg = #basic_message { content = Content }) ->
- Msg #basic_message {
- content = rabbit_binary_generator:ensure_content_encoded(Content) }.
-
-gain_memory(Inc, State = #mqstate { memory_size = QSize }) ->
- State #mqstate { memory_size = QSize + Inc }.
-
-lose_memory(Dec, State = #mqstate { memory_size = QSize }) ->
- State #mqstate { memory_size = QSize - Dec }.
-
-inc_queue_length(MsgBuf, 0) ->
- MsgBuf;
-inc_queue_length(MsgBuf, Count) ->
- {NewCount, MsgBufTail} =
- case queue:out_r(MsgBuf) of
- {empty, MsgBuf1} -> {Count, MsgBuf1};
- {{value, {on_disk, Len}}, MsgBuf1} -> {Len + Count, MsgBuf1};
- {{value, _}, _MsgBuf1} -> {Count, MsgBuf}
- end,
- queue:in({on_disk, NewCount}, MsgBufTail).
-
-dec_queue_length(MsgBuf, Count) ->
- case queue:out(MsgBuf) of
- {{value, {on_disk, Len}}, MsgBuf1} ->
- case Len of
- Count ->
- MsgBuf1;
- _ when Len > Count ->
- queue:in_r({on_disk, Len-Count}, MsgBuf1)
- end;
- _ -> MsgBuf
- end.
-
-maybe_prefetch(State = #mqstate { prefetcher = undefined,
- mode = mixed,
- msg_buf = MsgBuf,
- queue = Q }) ->
- case queue:peek(MsgBuf) of
- {value, {on_disk, Count}} ->
- %% only prefetch for the next contiguous block on
- %% disk. Beyond there, we either hit the end of the queue,
- %% or the next msg is already in RAM, held by us, the
- %% mixed queue
- {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(Q, Count),
- State #mqstate { prefetcher = Prefetcher };
- _ -> State
- end;
-maybe_prefetch(State) ->
- State.
-
-maybe_ack(_Q, true, true, AckTag) ->
- AckTag;
-maybe_ack(Q, _, _, AckTag) ->
- ok = rabbit_disk_queue:ack(Q, [AckTag]),
- not_on_disk.
-
-remove_diskless(MsgsWithAcks) ->
- lists:foldl(
- fun ({Msg, AckTag}, {AccAckTags, AccSize}) ->
- Msg1 = ensure_binary_properties(Msg),
- {case AckTag of
- not_on_disk -> AccAckTags;
- _ -> [AckTag | AccAckTags]
- end, size_of_message(Msg1) + AccSize}
- end, {[], 0}, MsgsWithAcks).
-
-on_disk(disk, _IsDurable, _IsPersistent) -> true;
-on_disk(mixed, true, true) -> true;
-on_disk(mixed, _IsDurable, _IsPersistent) -> false.
-
-publish_magic_marker_message(Q) ->
- Msg = rabbit_basic:message(
- rabbit_misc:r(<<"/">>, exchange, <<>>), ?MAGIC_MARKER,
- [], <<>>, <<>>, true),
- ok = rabbit_disk_queue:publish(Q, ensure_binary_properties(Msg), false).
-
-fetch_ack_magic_marker_message(Q) ->
- {Msg, false, AckTag, Length} = rabbit_disk_queue:fetch(Q),
- true = is_magic_marker_message(Msg),
- ok = rabbit_disk_queue:ack(Q, [AckTag]),
- {ok, Length}.
-
-is_magic_marker_message(#basic_message { routing_key = ?MAGIC_MARKER,
- is_persistent = true, guid = <<>> }) ->
- true;
-is_magic_marker_message(_) ->
- false.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c0a559e9ec..9dae268f19 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -348,7 +348,6 @@ seg_num_to_path(Dir, SegNum) ->
SegName = integer_to_list(SegNum),
filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION).
-
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------