summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl89
-rw-r--r--src/rabbit_invariable_queue.erl314
-rw-r--r--src/rabbit_msg_store.erl4
-rw-r--r--src/rabbit_persister.erl496
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl239
-rw-r--r--src/rabbit_writer.erl41
8 files changed, 187 insertions, 1003 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3dbd2b2299..25859c22f9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -491,9 +491,8 @@ attempt_delivery(#delivery{txn = none,
{AckTag, BQS1} =
BQ:publish_delivered(
AckRequired, Message,
- ?BASE_MESSAGE_PROPERTIES
- #message_properties{
- needs_confirming = NeedsConfirming},
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming = NeedsConfirming},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2e6658b29..0c8ad00ae3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, published_count, confirm_multiple, confirm_tref,
+ confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
held_confirms, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -191,7 +191,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
- published_count = 0,
+ publish_seqno = 0,
confirm_multiple = false,
held_confirms = gb_sets:new(),
unconfirmed = gb_sets:new(),
@@ -460,20 +460,19 @@ send_or_enqueue_ack(undefined, _QPid, State) ->
send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(
- MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = MSN}),
- State1
- end, State);
+ do_if_unconfirmed(MsgSeqNo, QPid,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{
+ delivery_tag = MSN}),
+ State1
+ end, State);
send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(
- MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
+ do_if_unconfirmed(MsgSeqNo, QPid,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_confirm_timer(
+ State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ end, State).
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
State = #ch{unconfirmed = UC,
@@ -484,9 +483,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
case QPid of
undefined ->
- ConfirmFun(MsgSeqNo,
- State#ch{unconfirmed = Unconfirmed1});
- _ ->
+ ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
+ _ ->
{ok, Qs} = dict:find(MsgSeqNo, QFM),
Qs1 = sets:del_element(QPid, Qs),
case sets:size(Qs1) of
@@ -499,7 +497,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
dict:store(MsgSeqNo, Qs1, QFM)}
end
end;
- false -> State
+ false ->
+ State
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -537,11 +536,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{MsgSeqNo, State1}
= case ConfirmEnabled of
false -> {undefined, State};
- true -> Count = State#ch.published_count,
- {Count,
- State#ch{published_count = Count + 1,
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo,
+ State#ch{publish_seqno = SeqNo + 1,
unconfirmed =
- gb_sets:add(Count, State#ch.unconfirmed)}}
+ gb_sets:add(SeqNo, State#ch.unconfirmed)}}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -1359,35 +1358,27 @@ stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
State#ch{confirm_tref = undefined}.
flush_multiple(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs,
- unconfirmed = UC}) ->
+ held_confirms = Cs}) ->
case gb_sets:is_empty(Cs) of
- true -> State;
+ true -> State#ch{confirm_tref = undefined};
false -> [First | Rest] = gb_sets:to_list(Cs),
- [rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = T}) ||
- T <- case Rest of
- [] -> [First];
- _ -> flush_multiple(
- First, Rest, WriterPid,
- case gb_sets:is_empty(UC) of
- false -> gb_sets:smallest(UC);
- true -> gb_sets:largest(Cs) + 1
- end)
- end],
+ {Mult, Inds} = find_consecutive_sequence(First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Inds),
State#ch{held_confirms = gb_sets:new(),
confirm_tref = undefined}
end.
-flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) ->
- ExpNext = Prev + 1,
- case {SNA >= Cur, Cur} of
- {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA);
- _ -> flush_multiple(Prev, [], WriterPid, SNA),
- [Cur | Rest]
- end;
-flush_multiple(Prev, [], WriterPid, _) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = Prev,
- multiple = true}),
- [].
+%% Find longest sequence of consecutive numbers at the beginning.
+find_consecutive_sequence(Last, []) ->
+ {Last, []};
+find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
+ find_consecutive_sequence(N, Ns);
+find_consecutive_sequence(Last, Ns) ->
+ {Last, Ns}.
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
deleted file mode 100644
index 5a0532eac1..0000000000
--- a/src/rabbit_invariable_queue.erl
+++ /dev/null
@@ -1,314 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_invariable_queue).
-
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
- dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
- idle_timeout/1, handle_pre_hibernate/1, status/1]).
-
--export([start/1, stop/0]).
-
--behaviour(rabbit_backing_queue).
-
--include("rabbit.hrl").
-
--record(iv_state, { queue, qname, durable, len, pending_ack }).
--record(tx, { pending_messages, pending_acks, is_persistent }).
-
--ifdef(use_specs).
-
--type(ack() :: rabbit_guid:guid() | 'blank_ack').
--type(state() :: #iv_state { queue :: queue(),
- qname :: rabbit_amqqueue:name(),
- len :: non_neg_integer(),
- pending_ack :: dict()
- }).
--include("rabbit_backing_queue_spec.hrl").
-
--endif.
-
-start(DurableQueues) ->
- ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
-
-stop() ->
- ok = rabbit_sup:stop_child(rabbit_persister).
-
-init(QName, IsDurable, Recover) ->
- Q = queue:from_list(case IsDurable andalso Recover of
- true -> rabbit_persister:queue_content(QName);
- false -> []
- end),
- #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = queue:len(Q),
- pending_ack = dict:new() }.
-
-terminate(State) ->
- State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-
-delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
- {_PLen, State1} = purge(State),
- terminate(State1).
-
-purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
- %% We do not purge messages pending acks.
- {AckTags, PA} =
- rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false },
- _MsgProps, _IsDelivered}, Acc) ->
- Acc;
- ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
- {AckTagsN, PAN}) ->
- ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)}
- end, {[], dict:new()}, Q),
- ok = persist_acks(QName, IsDurable, none, AckTags, PA),
- {Len, State #iv_state { len = 0, queue = queue:new() }}.
-
-publish(Msg, MsgProps, State = #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = Len }) ->
- ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
- State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }.
-
-publish_delivered(false, _Msg, _MsgProps, State) ->
- {blank_ack, State};
-publish_delivered(true, Msg = #basic_message { guid = Guid },
- MsgProps,
- State = #iv_state { qname = QName, durable = IsDurable,
- len = 0, pending_ack = PA }) ->
- ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
- ok = persist_delivery(QName, IsDurable, false, Msg),
- {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
-
-dropwhile(_Pred, State = #iv_state { len = 0 }) ->
- State;
-dropwhile(Pred, State = #iv_state { queue = Q }) ->
- {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
- case Pred(MsgProps) of
- true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps,
- IsDelivered, State),
- dropwhile(Pred, State1);
- false -> State
- end.
-
-fetch(_AckRequired, State = #iv_state { len = 0 }) ->
- {empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q }) ->
- {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
- fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State).
-
-fetch_internal(AckRequired, Q1,
- Msg = #basic_message { guid = Guid },
- MsgProps, IsDelivered,
- State = #iv_state { len = Len,
- qname = QName,
- durable = IsDurable,
- pending_ack = PA }) ->
- Len1 = Len - 1,
- ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- PA1 = store_ack(Msg, MsgProps, PA),
- {AckTag, PA2} = case AckRequired of
- true -> {Guid, PA1};
- false -> ok = persist_acks(QName, IsDurable, none,
- [Guid], PA1),
- {blank_ack, PA}
- end,
- {{Msg, IsDelivered, AckTag, Len1},
- State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-
-ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- ok = persist_acks(QName, IsDurable, none, AckTags, PA),
- PA1 = remove_acks(AckTags, PA),
- State #iv_state { pending_ack = PA1 }.
-
-tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
- durable = IsDurable }) ->
- Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps),
- State.
-
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
- State.
-
-tx_rollback(Txn, State = #iv_state { qname = QName }) ->
- #tx { pending_acks = AckTags } = lookup_tx(Txn),
- ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName),
- erase_tx(Txn),
- {lists:flatten(AckTags), State}.
-
-tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName,
- pending_ack = PA,
- queue = Q,
- len = Len }) ->
- #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
- ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName),
- erase_tx(Txn),
- Fun(),
- AckTags1 = lists:flatten(AckTags),
- PA1 = remove_acks(AckTags1, PA),
- {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
- {enqueue(Msg, MsgPropsFun(MsgProps),
- false, QN),
- LenN + 1}
- end, {Q, Len}, PubsRev),
- {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-
-requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA,
- queue = Q,
- len = Len }) ->
- %% We don't need to touch the persister here - the persister will
- %% already have these messages published and delivered as
- %% necessary. The complication is that the persister's seq_id will
- %% now be wrong, given the position of these messages in our queue
- %% here. However, the persister's seq_id is only used for sorting
- %% on startup, and requeue is silent as to where the requeued
- %% messages should appear, thus the persister is permitted to sort
- %% based on seq_id, even though it'll likely give a different
- %% order to the last known state of our queue, prior to shutdown.
- {Q1, Len1} = lists:foldl(
- fun (Guid, {QN, LenN}) ->
- {Msg = #basic_message {}, MsgProps}
- = dict:fetch(Guid, PA),
- {enqueue(Msg, MsgPropsFun(MsgProps), true, QN),
- LenN + 1}
- end, {Q, Len}, AckTags),
- PA1 = remove_acks(AckTags, PA),
- State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
-
-enqueue(Msg, MsgProps, IsDelivered, Q) ->
- queue:in({Msg, MsgProps, IsDelivered}, Q).
-
-len(#iv_state { len = Len }) -> Len.
-
-is_empty(State) -> 0 == len(State).
-
-set_ram_duration_target(_DurationTarget, State) -> State.
-
-ram_duration(State) -> {0, State}.
-
-needs_idle_timeout(_State) -> false.
-
-idle_timeout(State) -> State.
-
-handle_pre_hibernate(State) -> State.
-
-status(_State) -> [].
-
-%%----------------------------------------------------------------------------
-
-remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
-
-store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) ->
- dict:store(Guid, {Msg, MsgProps}, PA).
-
-%%----------------------------------------------------------------------------
-
-lookup_tx(Txn) ->
- case get({txn, Txn}) of
- undefined -> #tx { pending_messages = [],
- pending_acks = [],
- is_persistent = false };
- V -> V
- end.
-
-store_tx(Txn, Tx) ->
- put({txn, Txn}, Tx).
-
-erase_tx(Txn) ->
- erase({txn, Txn}).
-
-mark_tx_persistent(Txn) ->
- store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }).
-
-is_tx_persistent(Txn) ->
- (lookup_tx(Txn)) #tx.is_persistent.
-
-do_if_persistent(F, Txn, QName) ->
- ok = case is_tx_persistent(Txn) of
- false -> ok;
- true -> F({Txn, QName})
- end.
-
-%%----------------------------------------------------------------------------
-
-persist_message(QName, true, Txn, Msg = #basic_message {
- is_persistent = true }, MsgProps) ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- persist_work(Txn, QName,
- [{publish, Msg1, MsgProps,
- {QName, Msg1 #basic_message.guid}}]);
-persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) ->
- ok.
-
-persist_delivery(QName, true, false, #basic_message { is_persistent = true,
- guid = Guid }) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]);
-persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
- ok.
-
-persist_acks(QName, true, Txn, AckTags, PA) ->
- persist_work(Txn, QName,
- [{ack, {QName, Guid}} || Guid <- AckTags,
- begin
- {Msg, _MsgProps}
- = dict:fetch(Guid, PA),
- Msg #basic_message.is_persistent
- end]);
-persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
- ok.
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fea7d4a86b..e8b4e8e270 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -144,13 +144,13 @@
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
--type(guid_fun() :: fun ((gb_set()) -> any())).
+-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/3 :: (server(), client_ref(), guid_fun()) ->
+-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) ->
client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
deleted file mode 100644
index 11056c8e12..0000000000
--- a/src/rabbit_persister.erl
+++ /dev/null
@@ -1,496 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_persister).
-
--behaviour(gen_server).
-
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([transaction/1, extend_transaction/2, dirty_work/1,
- commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, queue_content/1]).
-
--include("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
--define(LOG_BUNDLE_DELAY, 5).
--define(COMPLETE_BUNDLE_DELAY, 2).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-
--record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies, snapshot}).
-
-%% two tables for efficient persistency
-%% one maps a key to a message
-%% the other maps a key to one or more queues.
-%% The aim is to reduce the overload of storing a message multiple times
-%% when it appears in several queues.
--record(psnapshot, {transactions, messages, queues, next_seq_id}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(pkey() :: rabbit_guid:guid()).
--type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-
--type(work_item() ::
- {publish,
- rabbit_types:message(), rabbit_types:message_properties(), pmsg()} |
- {deliver, pmsg()} |
- {ack, pmsg()}).
-
--spec(start_link/1 :: ([rabbit_amqqueue:name()]) ->
- rabbit_types:ok_pid_or_error()).
--spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()])
- -> 'ok').
--spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
--spec(rollback_transaction/1 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
--spec(force_snapshot/0 :: () -> 'ok').
--spec(queue_content/1 ::
- (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(DurableQueues) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
-
-transaction(MessageList) ->
- ?LOGDEBUG("transaction ~p~n", [MessageList]),
- TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
-
-extend_transaction(TxnKey, MessageList) ->
- ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
- gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}).
-
-dirty_work(MessageList) ->
- ?LOGDEBUG("dirty_work ~p~n", [MessageList]),
- gen_server:cast(?SERVER, {dirty_work, MessageList}).
-
-commit_transaction(TxnKey) ->
- ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
-
-rollback_transaction(TxnKey) ->
- ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
- gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
-
-force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot, infinity).
-
-queue_content(QName) ->
- gen_server:call(?SERVER, {queue_content, QName}, infinity).
-
-%%--------------------------------------------------------------------
-
-init([DurableQueues]) ->
- process_flag(trap_exit, true),
- FileName = base_filename(),
- ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{transactions = dict:new(),
- messages = ets:new(messages, []),
- queues = ets:new(queues, [ordered_set]),
- next_seq_id = 0},
- LogHandle =
- case disk_log:open([{name, rabbit_persister},
- {head, current_snapshot(Snapshot)},
- {file, FileName}]) of
- {ok, LH} -> LH;
- {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
- WarningFun = if
- Bad > 0 -> fun rabbit_log:warning/2;
- true -> fun rabbit_log:info/2
- end,
- WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
- [Recovered, Bad]),
- LH
- end,
- {Res, NewSnapshot} =
- internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
- case Res of
- ok ->
- ok = take_snapshot(LogHandle, NewSnapshot);
- {error, Reason} ->
- rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
- ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
- end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
- {ok, State}.
-
-handle_call({transaction, Key, MessageList}, From, State) ->
- NewState = internal_extend(Key, MessageList, State),
- do_noreply(internal_commit(From, Key, NewState));
-handle_call({commit_transaction, TxnKey}, From, State) ->
- do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State) ->
- do_reply(ok, flush(true, State));
-handle_call({queue_content, QName}, _From,
- State = #pstate{snapshot = #psnapshot{messages = Messages,
- queues = Queues}}) ->
- MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [],
- [{{'$4', '$1', '$2', '$3'}}]}],
- do_reply([{ets:lookup_element(Messages, K, 2), MP, D} ||
- {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))],
- State);
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({rollback_transaction, TxnKey}, State) ->
- do_noreply(internal_rollback(TxnKey, State));
-handle_cast({dirty_work, MessageList}, State) ->
- do_noreply(internal_dirty_work(MessageList, State));
-handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
- do_noreply(internal_extend(TxnKey, MessageList, State));
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(timeout, State = #pstate{deadline = infinity}) ->
- State1 = flush(true, State),
- {noreply, State1, hibernate};
-handle_info(timeout, State) ->
- do_noreply(flush(State));
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State = #pstate{log_handle = LogHandle}) ->
- flush(State),
- disk_log:close(LogHandle),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, flush(State)}.
-
-%%--------------------------------------------------------------------
-
-internal_extend(Key, MessageList, State) ->
- log_work(fun (ML) -> {extend_transaction, Key, ML} end,
- MessageList, State).
-
-internal_dirty_work(MessageList, State) ->
- log_work(fun (ML) -> {dirty_work, ML} end,
- MessageList, State).
-
-internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {commit_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
-
-internal_rollback(Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {rollback_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-complete(From, Item, State = #pstate{deadline = ExistingDeadline,
- pending_logs = Logs,
- pending_replies = Waiting}) ->
- State#pstate{deadline = compute_deadline(
- ?COMPLETE_BUNDLE_DELAY, ExistingDeadline),
- pending_logs = [Item | Logs],
- pending_replies = [From | Waiting]}.
-
-%% This is made to limit disk usage by writing messages only once onto
-%% disk. We keep a table associating pkeys to messages, and provided
-%% the list of messages to output is left to right, we can guarantee
-%% that pkeys will be a backreference to a message in memory when a
-%% "tied" is met.
-log_work(CreateWorkUnit, MessageList,
- State = #pstate{
- snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
- Unit = CreateWorkUnit(
- rabbit_misc:map_in_order(
- fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) ->
- case ets:lookup(Messages, PKey) of
- [_] -> {tied, MsgProps, QK};
- [] -> ets:insert(Messages, {PKey, Message}),
- M
- end;
- (M) -> M
- end,
- MessageList)),
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
- Message) ->
- State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
- ExistingDeadline),
- pending_logs = [Message | Logs]}.
-
-base_filename() ->
- rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
-
-take_snapshot(LogHandle, OldFileName, Snapshot) ->
- ok = disk_log:sync(LogHandle),
- %% current_snapshot is the Head (ie. first thing logged)
- ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)).
-
-take_snapshot(LogHandle, Snapshot) ->
- OldFileName = lists:flatten(base_filename() ++ ".previous"),
- file:delete(OldFileName),
- rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-take_snapshot_and_save_old(LogHandle, Snapshot) ->
- {MegaSecs, Secs, MicroSecs} = erlang:now(),
- Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs,
- OldFileName = lists:flatten(io_lib:format("~s.saved.~p",
- [base_filename(), Timestamp])),
- rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
- log_handle = LH,
- snapshot = Snapshot}) ->
- {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
- if
- Force orelse EntryCount >= MaxWrapEntries ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
- true ->
- State
- end.
-
-later_ms(DeltaMilliSec) ->
- {MegaSec, Sec, MicroSec} = now(),
- %% Note: not normalised. Unimportant for this application.
- {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}.
-
-%% Result = B - A, more or less
-time_diff({B1, B2, B3}, {A1, A2, A3}) ->
- (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 .
-
-compute_deadline(TimerDelay, infinity) ->
- later_ms(TimerDelay);
-compute_deadline(_TimerDelay, ExistingDeadline) ->
- ExistingDeadline.
-
-compute_timeout(infinity) ->
- {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
- HibernateAfter;
-compute_timeout(Deadline) ->
- DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
- if
- DeltaMilliSec =< 1 ->
- 0;
- true ->
- round(DeltaMilliSec)
- end.
-
-do_noreply(State = #pstate{deadline = Deadline}) ->
- {noreply, State, compute_timeout(Deadline)}.
-
-do_reply(Reply, State = #pstate{deadline = Deadline}) ->
- {reply, Reply, State, compute_timeout(Deadline)}.
-
-flush(State) -> flush(false, State).
-
-flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if PendingLogs /= [] ->
- disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- State#pstate{entry_count = State#pstate.entry_count + 1};
- true ->
- State
- end,
- State2 = maybe_take_snapshot(ForceSnapshot, State1),
- if Waiting /= [] ->
- ok = disk_log:sync(LogHandle),
- lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
- Waiting);
- true ->
- ok
- end,
- State2#pstate{deadline = infinity,
- pending_logs = [],
- pending_replies = []}.
-
-current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
- messages = Messages,
- queues = Queues,
- next_seq_id = NextSeqId}) ->
- %% Avoid infinite growth of the table by removing messages not
- %% bound to a queue anymore
- PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered,
- _MsgProps, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues),
- prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
- InnerSnapshot = {{txns, Ts},
- {messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)},
- {next_seq_id, NextSeqId}},
- ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
- {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- term_to_binary(InnerSnapshot)}.
-
-prune_table(Tab, Pred) ->
- true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Pred, ets:first(Tab)),
- true = ets:safe_fixtable(Tab, false).
-
-prune_table(_Tab, _Pred, '$end_of_table') -> ok;
-prune_table(Tab, Pred, Key) ->
- case Pred(Key) of
- true -> ok;
- false -> ets:delete(Tab, Key)
- end,
- prune_table(Tab, Pred, ets:next(Tab, Key)).
-
-internal_load_snapshot(LogHandle,
- DurableQueues,
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
- case check_version(Loaded_Snapshot) of
- {ok, StateBin} ->
- {{txns, Ts}, {messages, Ms}, {queues, Qs},
- {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
- true = ets:insert(Messages, Ms),
- true = ets:insert(Queues, Qs),
- Snapshot1 = replay(Items, LogHandle, K,
- Snapshot#psnapshot{
- transactions = Ts,
- next_seq_id = NextSeqId}),
- %% Remove all entries for queues that no longer exist.
- %% Note that the 'messages' table is pruned when the next
- %% snapshot is taken.
- DurableQueuesSet = sets:from_list(DurableQueues),
- prune_table(Snapshot1#psnapshot.queues,
- fun ({QName, _PKey}) ->
- sets:is_element(QName, DurableQueuesSet)
- end),
- %% uncompleted transactions are discarded - this is TRTTD
- %% since we only get into this code on node restart, so
- %% any uncompleted transactions will have been aborted.
- {ok, Snapshot1#psnapshot{transactions = dict:new()}};
- {error, Reason} -> {{error, Reason}, Snapshot}
- end.
-
-check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- StateBin}) ->
- {ok, StateBin};
-check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
- {error, {unsupported_persister_log_format, Vsn}};
-check_version(_Other) ->
- {error, unrecognised_persister_log_format}.
-
-replay([], LogHandle, K, Snapshot) ->
- case disk_log:chunk(LogHandle, K) of
- {K1, Items} ->
- replay(Items, LogHandle, K1, Snapshot);
- {K1, Items, Badbytes} ->
- rabbit_log:warning("~p bad bytes recovering persister log~n",
- [Badbytes]),
- replay(Items, LogHandle, K1, Snapshot);
- eof -> Snapshot
- end;
-replay([Item | Items], LogHandle, K, Snapshot) ->
- NewSnapshot = internal_integrate_messages(Item, Snapshot),
- replay(Items, LogHandle, K, NewSnapshot).
-
-internal_integrate_messages(Items, Snapshot) ->
- lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
- Snapshot, Items).
-
-internal_integrate1({extend_transaction, Key, MessageList},
- Snapshot = #psnapshot {transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
- Transactions)};
-internal_integrate1({rollback_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
-internal_integrate1({commit_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues,
- next_seq_id = SeqId}) ->
- case dict:find(Key, Transactions) of
- {ok, MessageLists} ->
- ?LOGDEBUG("persist committing txn ~p~n", [Key]),
- NextSeqId =
- lists:foldr(
- fun (ML, SeqIdN) ->
- perform_work(ML, Messages, Queues, SeqIdN) end,
- SeqId, MessageLists),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
- next_seq_id = NextSeqId};
- error ->
- Snapshot
- end;
-internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues,
- next_seq_id = SeqId}) ->
- Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
- Queues, SeqId)}.
-
-perform_work(MessageList, Messages, Queues, SeqId) ->
- lists:foldl(fun (Item, NextSeqId) ->
- perform_work_item(Item, Messages, Queues, NextSeqId)
- end, SeqId, MessageList).
-
-perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}},
- Messages, Queues, NextSeqId) ->
- true = ets:insert(Messages, {PKey, Message}),
- true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
- NextSeqId + 1;
-
-perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
- NextSeqId + 1;
-
-perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:update_element(Queues, QK, {2, true}),
- NextSeqId;
-
-perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:delete(Queues, QK),
- NextSeqId.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8b58b822a4..b9edad9a3a 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1898,7 +1898,7 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ5),
[{len, Len div 2},
- {target_ram_item_count, 0},
+ {target_ram_count, 0},
{ram_msg_count, 0},
{ram_ack_count, 0}]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7c676164b6..0db5116559 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -31,7 +31,7 @@
-module(rabbit_variable_queue).
--export([init/5, init/3, terminate/1, delete_and_terminate/1,
+-export([init/3, terminate/1, delete_and_terminate/1,
purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
@@ -42,7 +42,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0]).
+-export([start_msg_store/2, stop_msg_store/0, init/5]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -158,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the
-%% target_ram_item_count and ensures the queue remains responsive
+%% target_ram_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
@@ -256,7 +256,7 @@
len,
persistent_count,
- target_ram_item_count,
+ target_ram_count,
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
@@ -351,7 +351,7 @@
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
- target_ram_item_count :: non_neg_integer() | 'infinity',
+ target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(),
@@ -734,26 +734,24 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-set_ram_duration_target(DurationTarget,
- State = #vqstate {
- rates =
- #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates =
- #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
- target_ram_item_count = TargetRamItemCount }) ->
+set_ram_duration_target(
+ DurationTarget, State = #vqstate {
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
- TargetRamItemCount1 =
+ TargetRamCount1 =
case DurationTarget of
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
- a(case TargetRamItemCount1 == infinity orelse
- (TargetRamItemCount =/= infinity andalso
- TargetRamItemCount1 >= TargetRamItemCount) of
+ State1 = State #vqstate { target_ram_count = TargetRamCount1 },
+ a(case TargetRamCount1 == infinity orelse
+ (TargetRamCount =/= infinity andalso
+ TargetRamCount1 >= TargetRamCount) of
true -> State1;
false -> reduce_memory_use(State1)
end).
@@ -829,40 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
- on_sync = #sync { funs = From },
- target_ram_item_count = TargetRamItemCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- rates = #rates {
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates {
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
- {delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
- {len , Len},
- {pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
- {target_ram_item_count , TargetRamItemCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
- {next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {avg_ingress_rate , AvgIngressRate},
- {avg_egress_rate , AvgEgressRate},
- {avg_ack_ingress_rate , AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+status(#vqstate {
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
+ pending_ack = PA,
+ ram_ack_index = RAI,
+ on_sync = #sync { funs = From },
+ target_ram_count = TargetRamCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {outstanding_txns , length(From)},
+ {target_ram_count , TargetRamCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_ack_count , gb_trees:size(RAI)},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate, AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -1056,37 +1053,37 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- ram_ack_index = gb_trees:empty(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_item_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_ack_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new(),
- unconfirmed = gb_sets:new(),
- ack_out_counter = 0,
- ack_in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
- ack_rates = blank_rate(Now, 0) },
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = blank_rate(Now, DeltaCount1),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ ack_rates = blank_rate(Now, 0) },
a(maybe_deltas_to_betas(State)).
blank_rate(Timestamp, IngressLength) ->
@@ -1443,7 +1440,7 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
%% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a
-%% target_ram_item_count of zero or after an incremental alpha->beta
+%% target_ram_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up
@@ -1454,51 +1451,41 @@ msg_indices_written_to_disk(QPid, GuidSet) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_item_count = infinity}) ->
+ State = #vqstate {target_ram_count = infinity}) ->
{false, State};
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount,
- rates = #rates {
- avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates {
- avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress } }) ->
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount,
+ rates = #rates { avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates { avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress }
+ }) ->
{Reduce, State1} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamItemCount) of
- 0 ->
- {false, State};
- S1 ->
- ReduceFuns =
- case (AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress) of
- true ->
- %% ACKs are growing faster than the queue,
- %% push messages from there first.
- [AckFun, AlphaBetaFun];
- false ->
- %% The queue is growing faster than the
- %% acks, push queue messages first.
- [AlphaBetaFun, AckFun]
- end,
- {_, State2} =
- %% Both reduce functions get a chance to reduce
- %% memory. The second may very well get a quota of
- %% 0 if the first function managed to push out the
- %% maximum number of messages.
- lists:foldl(
- fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end, {S1, State}, ReduceFuns),
- {true, State2}
+ TargetRamCount) of
+ 0 -> {false, State};
+ %% Reduce memory of pending acks and alphas. The order is
+ %% determined based on which is growing faster. Whichever
+ %% comes second may very well get a quota of 0 if the
+ %% first manages to push out the max number of messages.
+ S1 -> {_, State2} =
+ lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end,
+ {S1, State},
+ case (AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress) of
+ true -> [AckFun, AlphaBetaFun];
+ false -> [AlphaBetaFun, AckFun]
+ end),
+ {true, State2}
end,
- case State1 #vqstate.target_ram_item_count of
+ case State1 #vqstate.target_ram_count of
0 -> {Reduce, BetaDeltaFun(State1)};
_ -> case chunk_size(State1 #vqstate.ram_index_count,
permitted_ram_index_count(State1)) of
@@ -1694,11 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount })
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
- TargetRamItemCount =:= infinity orelse
- TargetRamItemCount >= RamMsgCount ->
+ TargetRamCount =:= infinity orelse
+ TargetRamCount >= RamMsgCount ->
{Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 1b4710c6c3..0159609d9a 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -182,7 +182,7 @@ call(Pid, Msg) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord, Protocol) ->
+assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
@@ -197,17 +197,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
+%% We optimise delivery of small messages. Content-bearing methods
+%% require at least three frames. Small messages always fit into
+%% that. We hand their frames to the Erlang network functions in one
+%% go, which may lead to somewhat more efficient processing in the
+%% runtime and a greater chance of coalescing into fewer TCP packets.
+%%
+%% By contrast, for larger messages, split across many frames, we want
+%% to allow interleaving of frames on different channels. Hence we
+%% hand them to the Erlang network functions one frame at a time.
+send_frames(Fun, Sock, Frames) when length(Frames) =< 3 ->
+ Fun(Sock, Frames);
+send_frames(Fun, Sock, Frames) ->
+ lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, Frames).
+
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
+ ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ ok = send_frames(fun tcp_send/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -231,19 +248,19 @@ internal_send_command_async(MethodRecord,
#wstate{sock = Sock,
channel = Channel,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
- ok.
+ ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command_async(MethodRecord, Content,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)),
- ok.
+ ok = send_frames(fun port_cmd/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
port_cmd(Sock, Data) ->
- try rabbit_net:port_command(Sock, Data)
- catch error:Error -> exit({writer, send_failed, Error})
- end.
+ true = try rabbit_net:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end,
+ ok.