summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_variable_queue.erl')
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl3015
1 files changed, 3015 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
new file mode 100644
index 0000000000..cf6fa4a189
--- /dev/null
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -0,0 +1,3015 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_variable_queue).
+
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
+ publish/6, publish_delivered/5,
+ batch_publish/4, batch_publish_delivered/4,
+ discard/4, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
+ ackfold/4, fold/3, len/1, is_empty/1, depth/1,
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
+ handle_pre_hibernate/1, resume/1, msg_rates/1,
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).
+
+-export([start/2, stop/1]).
+
+%% exported for testing only
+-export([start_msg_store/3, stop_msg_store/1, init/6]).
+
+-export([move_messages_to_vhost_store/0]).
+
+-export([migrate_queue/3, migrate_message/3, get_per_vhost_store_client/2,
+ get_global_store_client/1, log_upgrade_verbose/1,
+ log_upgrade_verbose/2]).
+
+-include_lib("stdlib/include/qlc.hrl").
+
+-define(QUEUE_MIGRATION_BATCH_SIZE, 100).
+-define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
+
+%%----------------------------------------------------------------------------
+%% Messages, and their position in the queue, can be in memory or on
+%% disk, or both. Persistent messages will have both message and
+%% position pushed to disk as soon as they arrive; transient messages
+%% can be written to disk (and thus both types can be evicted from
+%% memory) under memory pressure. The question of whether a message is
+%% in RAM and whether it is persistent are orthogonal.
+%%
+%% Messages are persisted using the queue index and the message
+%% store. Normally the queue index holds the position of the message
+%% *within this queue* along with a couple of small bits of metadata,
+%% while the message store holds the message itself (including headers
+%% and other properties).
+%%
+%% However, as an optimisation, small messages can be embedded
+%% directly in the queue index and bypass the message store
+%% altogether.
+%%
+%% Definitions:
+%%
+%% alpha: this is a message where both the message itself, and its
+%% position within the queue are held in RAM
+%%
+%% beta: this is a message where the message itself is only held on
+%% disk (if persisted to the message store) but its position
+%% within the queue is held in RAM.
+%%
+%% gamma: this is a message where the message itself is only held on
+%% disk, but its position is both in RAM and on disk.
+%%
+%% delta: this is a collection of messages, represented by a single
+%% term, where the messages and their position are only held on
+%% disk.
+%%
+%% Note that for persistent messages, the message and its position
+%% within the queue are always held on disk, *in addition* to being in
+%% one of the above classifications.
+%%
+%% Also note that within this code, the term gamma seldom
+%% appears. It's frequently the case that gammas are defined by betas
+%% who have had their queue position recorded on disk.
+%%
+%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
+%% many of these steps are frequently skipped. q1 and q4 only hold
+%% alphas, q2 and q3 hold both betas and gammas. When a message
+%% arrives, its classification is determined. It is then added to the
+%% rightmost appropriate queue.
+%%
+%% If a new message is determined to be a beta or gamma, q1 is
+%% empty. If a new message is determined to be a delta, q1 and q2 are
+%% empty (and actually q4 too).
+%%
+%% When removing messages from a queue, if q4 is empty then q3 is read
+%% directly. If q3 becomes empty then the next segment's worth of
+%% messages from delta are read into q3, reducing the size of
+%% delta. If the queue is non empty, either q4 or q3 contain
+%% entries. It is never permitted for delta to hold all the messages
+%% in the queue.
+%%
+%% The duration indicated to us by the memory_monitor is used to
+%% calculate, given our current ingress and egress rates, how many
+%% messages we should hold in RAM (i.e. as alphas). We track the
+%% ingress and egress rates for both messages and pending acks and
+%% rates for both are considered when calculating the number of
+%% messages to hold in RAM. When we need to push alphas to betas or
+%% betas to gammas, we favour writing out messages that are further
+%% from the head of the queue. This minimises writes to disk, as the
+%% messages closer to the tail of the queue stay in the queue for
+%% longer, thus do not need to be replaced as quickly by sending other
+%% messages to disk.
+%%
+%% Whilst messages are pushed to disk and forgotten from RAM as soon
+%% as requested by a new setting of the queue RAM duration, the
+%% inverse is not true: we only load messages back into RAM as
+%% demanded as the queue is read from. Thus only publishes to the
+%% queue will take up available spare capacity.
+%%
+%% When we report our duration to the memory monitor, we calculate
+%% average ingress and egress rates over the last two samples, and
+%% then calculate our duration based on the sum of the ingress and
+%% egress rates. More than two samples could be used, but it's a
+%% balance between responding quickly enough to changes in
+%% producers/consumers versus ignoring temporary blips. The problem
+%% with temporary blips is that with just a few queues, they can have
+%% substantial impact on the calculation of the average duration and
+%% hence cause unnecessary I/O. Another alternative is to increase the
+%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
+%% seconds. However, that then runs the risk of being too slow to
+%% inform the memory monitor of changes. Thus a 5 second interval,
+%% plus a rolling average over the last two samples seems to work
+%% well in practice.
+%%
+%% The sum of the ingress and egress rates is used because the egress
+%% rate alone is not sufficient. Adding in the ingress rate means that
+%% queues which are being flooded by messages are given more memory,
+%% resulting in them being able to process the messages faster (by
+%% doing less I/O, or at least deferring it) and thus helping keep
+%% their mailboxes empty and thus the queue as a whole is more
+%% responsive. If such a queue also has fast but previously idle
+%% consumers, the consumer can then start to be driven as fast as it
+%% can go, whereas if only egress rate was being used, the incoming
+%% messages may have to be written to disk and then read back in,
+%% resulting in the hard disk being a bottleneck in driving the
+%% consumers. Generally, we want to give Rabbit every chance of
+%% getting rid of messages as fast as possible and remaining
+%% responsive, and using only the egress rate impacts that goal.
+%%
+%% Once the queue has more alphas than the target_ram_count, the
+%% surplus must be converted to betas, if not gammas, if not rolled
+%% into delta. The conditions under which these transitions occur
+%% reflect the conflicting goals of minimising RAM cost per msg, and
+%% minimising CPU cost per msg. Once the msg has become a beta, its
+%% payload is no longer in RAM, thus a read from the msg_store must
+%% occur before the msg can be delivered, but the RAM cost of a beta
+%% is the same as a gamma, so converting a beta to gamma will not free
+%% up any further RAM. To reduce the RAM cost further, the gamma must
+%% be rolled into delta. Whilst recovering a beta or a gamma to an
+%% alpha requires only one disk read (from the msg_store), recovering
+%% a msg from within delta will require two reads (queue_index and
+%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
+%% conflict is between using delta more, which will free up more
+%% memory, but require additional CPU and disk ops, versus using delta
+%% less and gammas and betas more, which will cost more memory, but
+%% require fewer disk ops and less CPU overhead.
+%%
+%% In the case of a persistent msg published to a durable queue, the
+%% msg is immediately written to the msg_store and queue_index. If
+%% then additionally converted from an alpha, it'll immediately go to
+%% a gamma (as it's already in queue_index), and cannot exist as a
+%% beta. Thus a durable queue with a mixture of persistent and
+%% transient msgs in it which has more messages than permitted by the
+%% target_ram_count may contain an interspersed mixture of betas and
+%% gammas in q2 and q3.
+%%
+%% There is then a ratio that controls how many betas and gammas there
+%% can be. This is based on the target_ram_count and thus expresses
+%% the fact that as the number of permitted alphas in the queue falls,
+%% so should the number of betas and gammas fall (i.e. delta
+%% grows). If q2 and q3 contain more than the permitted number of
+%% betas and gammas, then the surplus are forcibly converted to gammas
+%% (as necessary) and then rolled into delta. The ratio is that
+%% delta/(betas+gammas+delta) equals
+%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
+%% the target_ram_count shrinks to 0, so must betas and gammas.
+%%
+%% The conversion of betas to deltas is done if there are at least
+%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small,
+%% otherwise the frequent operations on the queues of q2 and q3 will not be
+%% effectively amortised (switching the direction of queue access defeats
+%% amortisation). Note that there is a natural upper bound due to credit_flow
+%% limits on the alpha to beta conversion.
+%%
+%% The conversion from alphas to betas is chunked due to the
+%% credit_flow limits of the msg_store. This further smooths the
+%% effects of changes to the target_ram_count and ensures the queue
+%% remains responsive even when there is a large amount of IO work to
+%% do. The 'resume' callback is utilised to ensure that conversions
+%% are done as promptly as possible whilst ensuring the queue remains
+%% responsive.
+%%
+%% In the queue we keep track of both messages that are pending
+%% delivery and messages that are pending acks. In the event of a
+%% queue purge, we only need to load qi segments if the queue has
+%% elements in deltas (i.e. it came under significant memory
+%% pressure). In the event of a queue deletion, in addition to the
+%% preceding, by keeping track of pending acks in RAM, we do not need
+%% to search through qi segments looking for messages that are yet to
+%% be acknowledged.
+%%
+%% Pending acks are recorded in memory by storing the message itself.
+%% If the message has been sent to disk, we do not store the message
+%% content. During memory reduction, pending acks containing message
+%% content have that content removed and the corresponding messages
+%% are pushed out to disk.
+%%
+%% Messages from pending acks are returned to q4, q3 and delta during
+%% requeue, based on the limits of seq_id contained in each. Requeued
+%% messages retain their original seq_id, maintaining order
+%% when requeued.
+%%
+%% The order in which alphas are pushed to betas and pending acks
+%% are pushed to disk is determined dynamically. We always prefer to
+%% push messages for the source (alphas or acks) that is growing the
+%% fastest (with growth measured as avg. ingress - avg. egress).
+%%
+%% Notes on Clean Shutdown
+%% (This documents behaviour in variable_queue, queue_index and
+%% msg_store.)
+%%
+%% In order to try to achieve as fast a start-up as possible, if a
+%% clean shutdown occurs, we try to save out state to disk to reduce
+%% work on startup. In the msg_store this takes the form of the
+%% index_module's state, plus the file_summary ets table, and client
+%% refs. In the VQ, this takes the form of the count of persistent
+%% messages in the queue and references into the msg_stores. The
+%% queue_index adds to these terms the details of its segments and
+%% stores the terms in the queue directory.
+%%
+%% Two message stores are used. One is created for persistent messages
+%% to durable queues that must survive restarts, and the other is used
+%% for all other messages that just happen to need to be written to
+%% disk. On start up we can therefore nuke the transient message
+%% store, and be sure that the messages in the persistent store are
+%% all that we need.
+%%
+%% The references to the msg_stores are there so that the msg_store
+%% knows to only trust its saved state if all of the queues it was
+%% previously talking to come up cleanly. Likewise, the queues
+%% themselves (esp queue_index) skips work in init if all the queues
+%% and msg_store were shutdown cleanly. This gives both good speed
+%% improvements and also robustness so that if anything possibly went
+%% wrong in shutdown (or there was subsequent manual tampering), all
+%% messages and queues that can be recovered are recovered, safely.
+%%
+%% To delete transient messages lazily, the variable_queue, on
+%% startup, stores the next_seq_id reported by the queue_index as the
+%% transient_threshold. From that point on, whenever it's reading a
+%% message off disk via the queue_index, if the seq_id is below this
+%% threshold and the message is transient then it drops the message
+%% (the message itself won't exist on disk because it would have been
+%% stored in the transient msg_store which would have had its saved
+%% state nuked on startup). This avoids the expensive operation of
+%% scanning the entire queue on startup in order to delete transient
+%% messages that were only pushed to disk to save memory.
+%%
+%%----------------------------------------------------------------------------
+
+-behaviour(rabbit_backing_queue).
+
+-record(vqstate,
+ { q1,
+ q2,
+ delta,
+ q3,
+ q4,
+ next_seq_id,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
+ index_state,
+ msg_store_clients,
+ durable,
+ transient_threshold,
+ qi_embed_msgs_below,
+
+ len, %% w/o unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
+ delta_transient_bytes, %%
+
+ target_ram_count,
+ ram_msg_count, %% w/o unacked
+ ram_msg_count_prev,
+ ram_ack_count_prev,
+ ram_bytes, %% w unacked
+ out_counter,
+ in_counter,
+ rates,
+ msgs_on_disk,
+ msg_indices_on_disk,
+ unconfirmed,
+ confirmed,
+ ack_out_counter,
+ ack_in_counter,
+ %% Unlike the other counters these two do not feed into
+ %% #rates{} and get reset
+ disk_read_count,
+ disk_write_count,
+
+ io_batch_size,
+
+ %% default queue or lazy queue
+ mode,
+ %% number of reduce_memory_usage executions, once it
+ %% reaches a threshold the queue will manually trigger a runtime GC
+ %% see: maybe_execute_gc/1
+ memory_reduction_run_count,
+ %% Queue data is grouped by VHost. We need to store it
+ %% to work with queue index.
+ virtual_host,
+ waiting_bump = false
+ }).
+
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
+
+-record(msg_status,
+ { seq_id,
+ msg_id,
+ msg,
+ is_persistent,
+ is_delivered,
+ msg_in_store,
+ index_on_disk,
+ persist_to,
+ msg_props
+ }).
+
+-record(delta,
+ { start_seq_id, %% start_seq_id is inclusive
+ count,
+ transient,
+ end_seq_id %% end_seq_id is exclusive
+ }).
+
+-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
+-define(QUEUE, lqueue).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+-include("amqqueue.hrl").
+
+%%----------------------------------------------------------------------------
+
+-rabbit_upgrade({multiple_routing_keys, local, []}).
+-rabbit_upgrade({move_messages_to_vhost_store, message_store, []}).
+
+-type seq_id() :: non_neg_integer().
+
+-type rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: rabbit_types:timestamp()}.
+
+-type delta() :: #delta { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer(),
+ end_seq_id :: non_neg_integer() }.
+
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type ack() :: seq_id().
+-type state() :: #vqstate {
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
+ delta :: delta(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
+ next_seq_id :: seq_id(),
+ ram_pending_ack :: gb_trees:tree(),
+ disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
+ {any(), binary()}},
+ durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+ qi_embed_msgs_below :: non_neg_integer(),
+
+ len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
+ persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
+
+ target_ram_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ rates :: rates(),
+ msgs_on_disk :: gb_sets:set(),
+ msg_indices_on_disk :: gb_sets:set(),
+ unconfirmed :: gb_sets:set(),
+ confirmed :: gb_sets:set(),
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ disk_read_count :: non_neg_integer(),
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer(),
+ mode :: 'default' | 'lazy',
+ memory_reduction_run_count :: non_neg_integer()}.
+
+-define(BLANK_DELTA, #delta { start_seq_id = undefined,
+ count = 0,
+ transient = 0,
+ end_seq_id = undefined }).
+-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
+ count = 0,
+ transient = 0,
+ end_seq_id = Z }).
+
+-define(MICROS_PER_SECOND, 1000000.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 100).
+
+%% we define the garbage collector threshold
+%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
+%% see: rabbitmq-server-973 and rabbitmq-server-964
+-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
+-define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
+ case get(explicit_gc_run_operation_threshold) of
+ undefined ->
+ Val = explicit_gc_run_operation_threshold_for_mode(Mode),
+ put(explicit_gc_run_operation_threshold, Val),
+ Val;
+ Val -> Val
+ end).
+
+explicit_gc_run_operation_threshold_for_mode(Mode) ->
+ {Key, Fallback} = case Mode of
+ lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
+ _ -> {queue_explicit_gc_run_operation_threshold,
+ ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
+ end,
+ rabbit_misc:get_env(rabbit, Key, Fallback).
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start(VHost, DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
+ %% Group recovery terms by vhost.
+ ClientRefs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ start_msg_store(VHost, ClientRefs, StartFunState),
+ {ok, AllTerms}.
+
+stop(VHost) ->
+ ok = stop_msg_store(VHost),
+ ok = rabbit_queue_index:stop(VHost).
+
+start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
+ rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
+ do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
+ do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
+ ok.
+
+do_start_msg_store(VHost, Type, Refs, StartFunState) ->
+ case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
+ {ok, _} ->
+ rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
+ {error, {no_such_vhost, VHost}} = Err ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n",
+ [Type, VHost]),
+ exit(Err);
+ {error, Error} ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n",
+ [Type, VHost, Error]),
+ exit({error, Error})
+ end.
+
+abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
+abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
+
+stop_msg_store(VHost) ->
+ rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
+ rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
+ ok.
+
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
+
+init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
+ QueueName = amqqueue:get_name(Q),
+ IsDurable = amqqueue:is_durable(Q),
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ VHost = QueueName#resource.virtual_host,
+ init(IsDurable, IndexState, 0, 0, [],
+ case IsDurable of
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
+ MsgOnDiskFun, AsyncCallback, VHost);
+ false -> undefined
+ end,
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
+ AsyncCallback, VHost), VHost);
+
+%% We can be recovering a transient queue if it crashed
+init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
+ QueueName = amqqueue:get_name(Q),
+ IsDurable = amqqueue:is_durable(Q),
+ {PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ VHost = QueueName#resource.virtual_host,
+ {PersistentClient, ContainsCheckFun} =
+ case IsDurable of
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun, AsyncCallback,
+ VHost),
+ {C, fun (MsgId) when is_binary(MsgId) ->
+ rabbit_msg_store:contains(MsgId, C);
+ (#basic_message{is_persistent = Persistent}) ->
+ Persistent
+ end};
+ false -> {undefined, fun(_MsgId) -> false end}
+ end,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
+ undefined, AsyncCallback,
+ VHost),
+ {DeltaCount, DeltaBytes, IndexState} =
+ rabbit_queue_index:recover(
+ QueueName, RecoveryTerms,
+ rabbit_vhost_msg_store:successfully_recovered_state(
+ VHost,
+ ?PERSISTENT_MSG_STORE),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
+ PersistentClient, TransientClient, VHost).
+
+process_recovery_terms(Terms=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Terms};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), []};
+ PRef -> {PRef, Terms}
+ end.
+
+terminate(_Reason, State) ->
+ State1 = #vqstate { virtual_host = VHost,
+ persistent_count = PCount,
+ persistent_bytes = PBytes,
+ index_state = IndexState,
+ msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack(true, State),
+ PRef = case MSCStateP of
+ undefined -> undefined;
+ _ -> ok = maybe_client_terminate(MSCStateP),
+ rabbit_msg_store:client_ref(MSCStateP)
+ end,
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
+ a(State1#vqstate {
+ index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState),
+ msg_store_clients = undefined }).
+
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete_and_terminate(_Reason, State) ->
+ %% Normally when we purge messages we interact with the qi by
+ %% issues delivers and acks for every purged message. In this case
+ %% we don't need to do that, so we just delete the qi.
+ State1 = purge_and_index_reset(State),
+ State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack_delete_and_terminate(State1),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
+ end,
+ rabbit_msg_store:client_delete_and_terminate(MSCStateT),
+ a(State2 #vqstate { msg_store_clients = undefined }).
+
+delete_crashed(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ ok = rabbit_queue_index:erase(QName).
+
+purge(State = #vqstate { len = Len }) ->
+ case is_pending_ack_empty(State) and is_unconfirmed_empty(State) of
+ true ->
+ {Len, purge_and_index_reset(State)};
+ false ->
+ {Len, purge_when_pending_acks(State)}
+ end.
+
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
+ State1 =
+ publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ a(maybe_reduce_memory_use(maybe_update_rates(State1))).
+
+batch_publish(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, State1} =
+ lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
+ State2 = ui(State1),
+ a(maybe_reduce_memory_use(maybe_update_rates(State2))).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
+
+batch_publish_delivered(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, SeqIds, State1} =
+ lists:foldl(fun batch_publish_delivered1/2,
+ {ChPid, Flow, [], State}, Publishes),
+ State2 = ui(State1),
+ {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
+
+discard(_MsgId, _ChPid, _Flow, State) -> State.
+
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ case gb_sets:is_empty(C) of
+ true -> {[], State}; %% common case
+ false -> {gb_sets:to_list(C), State #vqstate {
+ confirmed = gb_sets:new() }}
+ end.
+
+dropwhile(Pred, State) ->
+ {MsgProps, State1} =
+ remove_by_predicate(Pred, State),
+ {MsgProps, a(State1)}.
+
+fetchwhile(Pred, Fun, Acc, State) ->
+ {MsgProps, Acc1, State1} =
+ fetch_by_predicate(Pred, Fun, Acc, State),
+ {MsgProps, Acc1, a(State1)}.
+
+fetch(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ %% it is possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
+ end.
+
+%% Duplicated from rabbit_backing_queue
+-spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+ack([], State) ->
+ {[], State};
+%% optimisation: this head is essentially a partial evaluation of the
+%% general case below, for the single-ack case.
+ack([SeqId], State) ->
+ case remove_pending_ack(true, SeqId, State) of
+ {none, _} ->
+ {[], State};
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} ->
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + 1 })}
+ end;
+ack(AckTags, State) ->
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2}) ->
+ case remove_pending_ack(true, SeqId, State2) of
+ {none, _} ->
+ {Acc, State2};
+ {MsgStatus, State3} ->
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ {lists:reverse(AllMsgIds),
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + length(AckTags) })}.
+
+requeue(AckTags, #vqstate { mode = default,
+ delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ fun publish_alpha/2, State),
+ {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
+ delta_limit(Delta),
+ fun publish_beta/2, State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, a(maybe_reduce_memory_use(
+ maybe_update_rates(ui(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }))))};
+requeue(AckTags, #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
+ delta_limit(Delta),
+ fun publish_beta/2, State),
+ {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
+ State1),
+ MsgCount = length(MsgIds1),
+ {MsgIds1, a(maybe_reduce_memory_use(
+ maybe_update_rates(ui(
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }))))}.
+
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
+
+len(#vqstate { len = Len }) -> Len.
+
+is_empty(State) -> 0 == len(State).
+
+depth(State) ->
+ len(State) + count_pending_acks(State).
+
+set_ram_duration_target(
+ DurationTarget, State = #vqstate {
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ target_ram_count = TargetRamCount }) ->
+ Rate =
+ AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
+ TargetRamCount1 =
+ case DurationTarget of
+ infinity -> infinity;
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
+ end,
+ State1 = State #vqstate { target_ram_count = TargetRamCount1 },
+ a(case TargetRamCount1 == infinity orelse
+ (TargetRamCount =/= infinity andalso
+ TargetRamCount1 >= TargetRamCount) of
+ true -> State1;
+ false -> reduce_memory_use(State1)
+ end).
+
+maybe_update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount })
+ when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
+ update_rates(State);
+maybe_update_rates(State) ->
+ State.
+
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
+ Now = erlang:monotonic_time(),
+
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ timestamp = Now },
+
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, TS, Count, Rate) ->
+ Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) /
+ ?MICROS_PER_SECOND,
+ if
+ Time == 0 -> Rate;
+ true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE,
+ Count / Time, Rate)
+ end.
+
+ram_duration(State) ->
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
+ ram_ack_count_prev = RamAckCountPrev } =
+ update_rates(State),
+
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
+
+ Duration = %% msgs+acks / (msgs+acks/sec) == sec
+ case lists:all(fun (X) -> X < 0.01 end,
+ [AvgEgressRate, AvgIngressRate,
+ AvgAckEgressRate, AvgAckIngressRate]) of
+ true -> infinity;
+ false -> (RamMsgCountPrev + RamMsgCount +
+ RamAckCount + RamAckCountPrev) /
+ (4 * (AvgEgressRate + AvgIngressRate +
+ AvgAckEgressRate + AvgAckIngressRate))
+ end,
+
+ {Duration, State1}.
+
+needs_timeout(#vqstate { index_state = IndexState }) ->
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false -> false
+ end.
+
+timeout(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
+
+handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+
+handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
+ State#vqstate{ waiting_bump = false };
+handle_info(bump_reduce_memory_use, State) ->
+ State.
+
+resume(State) -> a(reduce_memory_use(State)).
+
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) ->
+ Count;
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
+info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) ->
+ PagedOutBytes;
+info(head_message_timestamp, #vqstate{
+ q3 = Q3,
+ q4 = Q4,
+ ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ head_message_timestamp(Q3, Q4, RPA, QPA);
+info(disk_reads, #vqstate{disk_read_count = Count}) ->
+ Count;
+info(disk_writes, #vqstate{disk_write_count = Count}) ->
+ Count;
+info(backing_queue_status, #vqstate {
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = Mode,
+ len = Len,
+ target_ram_count = TargetRamCount,
+ next_seq_id = NextSeqId,
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
+
+ [ {mode , Mode},
+ {q1 , ?QUEUE:len(Q1)},
+ {q2 , ?QUEUE:len(Q2)},
+ {delta , Delta},
+ {q3 , ?QUEUE:len(Q3)},
+ {q4 , ?QUEUE:len(Q4)},
+ {len , Len},
+ {target_ram_count , TargetRamCount},
+ {next_seq_id , NextSeqId},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate, AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(_, _) ->
+ ''.
+
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
+invoke( _, _, State) -> State.
+
+is_duplicate(_Msg, State) -> {false, State}.
+
+set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
+ State;
+set_queue_mode(lazy, State = #vqstate {
+ target_ram_count = TargetRamCount }) ->
+ %% To become a lazy queue we need to page everything to disk first.
+ State1 = convert_to_lazy(State),
+ %% restore the original target_ram_count
+ a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
+set_queue_mode(default, State) ->
+ %% becoming a default queue means loading messages from disk like
+ %% when a queue is recovered.
+ a(maybe_deltas_to_betas(State #vqstate { mode = default }));
+set_queue_mode(_, State) ->
+ State.
+
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
+ lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
+ [{Id, AckTag} | Acc]
+ end, Accumulator, lists:zip(Msgs, AckTags)).
+
+convert_to_lazy(State) ->
+ State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
+ set_ram_duration_target(0, State),
+ case Delta#delta.count + ?QUEUE:len(Q3) == Len of
+ true ->
+ State1;
+ false ->
+ %% When pushing messages to disk, we might have been
+ %% blocked by the msg_store, so we need to see if we have
+ %% to wait for more credit, and then keep paging messages.
+ %%
+ %% The amqqueue_process could have taken care of this, but
+ %% between the time it receives the bump_credit msg and
+ %% calls BQ:resume to keep paging messages to disk, some
+ %% other request may arrive to the BQ which at this moment
+ %% is not in a proper state for a lazy BQ (unless all
+ %% messages have been paged to disk already).
+ wait_for_msg_store_credit(),
+ convert_to_lazy(resume(State1))
+ end.
+
+wait_for_msg_store_credit() ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg)
+ end;
+ false -> ok
+ end.
+
+%% Get the Timestamp property of the first msg, if present. This is
+%% the one with the oldest timestamp among the heads of the pending
+%% acks and unread queues. We can't check disk_pending_acks as these
+%% are paged out - we assume some will soon be paged in rather than
+%% forcing it to happen. Pending ack msgs are included as they are
+%% regarded as unprocessed until acked, this also prevents the result
+%% apparently oscillating during repeated rejects. Q3 is only checked
+%% when Q4 is empty as any Q4 msg will be earlier.
+head_message_timestamp(Q3, Q4, RPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
+ HeadMsgStatus <-
+ [ get_qs_head([Q4, Q3]),
+ get_pa_head(RPA),
+ get_pa_head(QPA) ],
+ HeadMsgStatus /= undefined,
+ HeadMsgStatus#msg_status.msg /= undefined ],
+
+ Timestamps =
+ [Timestamp || HeadMsg <- HeadMsgs,
+ Timestamp <- [rabbit_basic:extract_timestamp(
+ HeadMsg#basic_message.content)],
+ Timestamp /= undefined
+ ],
+
+ case Timestamps == [] of
+ true -> '';
+ false -> lists:min(Timestamps)
+ end.
+
+get_qs_head(Qs) ->
+ catch lists:foldl(
+ fun (Q, Acc) ->
+ case get_q_head(Q) of
+ undefined -> Acc;
+ Val -> throw(Val)
+ end
+ end, undefined, Qs).
+
+get_q_head(Q) ->
+ get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
+
+get_pa_head(PA) ->
+ get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
+
+get_collection_head(Col, IsEmpty, GetVal) ->
+ case IsEmpty(Col) of
+ false ->
+ {_, MsgStatus} = GetVal(Col),
+ MsgStatus;
+ true -> undefined
+ end.
+
+%%----------------------------------------------------------------------------
+%% Minor helpers
+%%----------------------------------------------------------------------------
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = default,
+ len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
+ persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
+ LZ = Len == 0,
+
+ %% if q1 has messages then q3 cannot be empty. See publish/6.
+ true = E1 or not E3,
+ %% if q2 has messages then we have messages in delta (paged to
+ %% disk). See push_alphas_to_betas/2.
+ true = E2 or not ED,
+ %% if delta has messages then q3 cannot be empty. This is enforced
+ %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages
+ %% are always kept on RAM.
+ true = ED or not E3,
+ %% if the queue length is 0, then q3 and q4 must be empty.
+ true = LZ == (E3 and E4),
+
+ true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
+ true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
+ true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
+
+ State;
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = lazy,
+ len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
+ persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
+ LZ = Len == 0,
+ L3 = ?QUEUE:len(Q3),
+
+ %% q1 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E1,
+
+ %% q2 only gets messages from q1 when push_alphas_to_betas is
+ %% called for a non empty delta, which won't be the case for a
+ %% lazy queue. This means q2 must always be empty.
+ true = E2,
+
+ %% q4 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E4,
+
+ %% if the queue is empty, then delta is empty and q3 is empty.
+ true = LZ == (ED and E3),
+
+ %% There should be no messages in q1, q2, and q4
+ true = Delta#delta.count + L3 == Len,
+
+ true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
+ true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
+ true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
+
+ State.
+
+d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
+ when Start + Count =< End ->
+ Delta.
+
+m(MsgStatus = #msg_status { is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk }) ->
+ true = (not IsPersistent) or IndexOnDisk,
+ true = msg_in_ram(MsgStatus) or MsgInStore,
+ MsgStatus.
+
+one_if(true ) -> 1;
+one_if(false) -> 0.
+
+cons_if(true, E, L) -> [E | L];
+cons_if(false, _E, L) -> L.
+
+gb_sets_maybe_insert(false, _Val, Set) -> Set;
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_in_store = false,
+ index_on_disk = false,
+ persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize),
+ msg_props = MsgProps}.
+
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ persist_to = queue_index,
+ msg_in_store = false};
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ persist_to = msg_store,
+ msg_in_store = true}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
+ #msg_status{seq_id = SeqId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = true,
+ msg_props = MsgProps}.
+
+trim_msg_status(MsgStatus) ->
+ case persist_to(MsgStatus) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
+
+with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(MSCStateP),
+ {Result, {MSCStateP1, MSCStateT}};
+with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(MSCStateT),
+ {Result, {MSCStateP, MSCStateT1}}.
+
+with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
+ {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
+ fun (MSCState1) ->
+ {Fun(MSCState1), MSCState1}
+ end),
+ Res.
+
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback, VHost).
+
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ rabbit_vhost_msg_store:client_init(VHost, MsgStore,
+ Ref, MsgOnDiskFun,
+ fun () ->
+ Callback(?MODULE, CloseFDsFun)
+ end).
+
+msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
+
+msg_store_read(MSCState, IsPersistent, MsgId) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
+
+msg_store_remove(MSCState, IsPersistent, MsgIds) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
+
+msg_store_close_fds(MSCState, IsPersistent) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
+
+msg_store_close_fds_fun(IsPersistent) ->
+ fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
+ end.
+
+maybe_write_delivered(false, _SeqId, IndexState) ->
+ IndexState;
+maybe_write_delivered(true, SeqId, IndexState) ->
+ rabbit_queue_index:deliver([SeqId], IndexState).
+
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} =
+ lists:foldr(
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true -> {Filtered1,
+ cons_if(not IsDelivered, SeqId, Delivers1),
+ [SeqId | Acks1], RRC, RB, TC, TB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case is_msg_in_pending_acks(SeqId, State) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size,
+ TC + one_if(not IsPersistent),
+ TB + one_if(not IsPersistent) * Size};
+ true -> Acc %% [0]
+ end
+ end
+ end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State),
+ TransientCount, TransientBytes}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
+
+is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)).
+
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) ->
+ d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1,
+ transient = one_if(not IsPersistent)});
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ transient = Transient } = Delta,
+ IsPersistent )
+ when SeqId < StartSeqId ->
+ d(Delta #delta { start_seq_id = SeqId, count = Count + 1,
+ transient = Transient + one_if(not IsPersistent)});
+expand_delta(SeqId, #delta { count = Count,
+ end_seq_id = EndSeqId,
+ transient = Transient } = Delta,
+ IsPersistent)
+ when SeqId >= EndSeqId ->
+ d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1,
+ transient = Transient + one_if(not IsPersistent)});
+expand_delta(_SeqId, #delta { count = Count,
+ transient = Transient } = Delta,
+ IsPersistent ) ->
+ d(Delta #delta { count = Count + 1,
+ transient = Transient + one_if(not IsPersistent) }).
+
+%%----------------------------------------------------------------------------
+%% Internal major helpers for Public API
+%%----------------------------------------------------------------------------
+
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
+ PersistentClient, TransientClient, VHost) ->
+ {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+
+ {DeltaCount1, DeltaBytes1} =
+ case Terms of
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
+ end,
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> d(#delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ transient = 0,
+ end_seq_id = NextSeqId })
+ end,
+ Now = erlang:monotonic_time(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
+
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ State = #vqstate {
+ q1 = ?QUEUE:new(),
+ q2 = ?QUEUE:new(),
+ delta = Delta,
+ q3 = ?QUEUE:new(),
+ q4 = ?QUEUE:new(),
+ next_seq_id = NextSeqId,
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+ qi_embed_msgs_below = IndexMaxSize,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
+ delta_transient_bytes = 0,
+
+ target_ram_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_bytes = 0,
+ unacked_bytes = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = blank_rates(Now),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ disk_read_count = 0,
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize,
+
+ mode = default,
+ memory_reduction_run_count = 0,
+ virtual_host = VHost},
+ a(maybe_deltas_to_betas(State)).
+
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
+
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
+ case ?QUEUE:is_empty(Q4) of
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ stats(ready0, {MsgStatus, MsgStatus1}, 0,
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
+ end;
+in_r(MsgStatus,
+ State = #vqstate { mode = default, q4 = Q4 }) ->
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
+%% lazy queues
+in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent },
+ State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
+ case ?QUEUE:is_empty(Q3) of
+ true ->
+ {_MsgStatus1, State1} =
+ maybe_write_to_disk(true, true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, 1, State1),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
+ State2 #vqstate{ delta = Delta1};
+ false ->
+ State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
+ end.
+
+queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
+ case ?QUEUE:out(Q4) of
+ {empty, _Q4} ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end;
+ {{value, MsgStatus}, Q4a} ->
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end;
+%% lazy queues
+queue_out(State = #vqstate { mode = lazy }) ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end.
+
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
+read_msg(#msg_status{msg = Msg}, State) ->
+ {Msg, State}.
+
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
+ disk_read_count = Count}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate {msg_store_clients = MSCState1,
+ disk_read_count = Count + 1}}.
+
+stats(Signs, Statuses, DeltaPaged, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State).
+
+expand_signs(ready0) -> {0, 0, true};
+expand_signs(lazy_pub) -> {1, 0, true};
+expand_signs({A, B}) -> {A, B, false}.
+
+expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
+expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({lazy, A}) -> {false , false, A};
+expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
+
+%% In this function at least, we are religious: the variable name
+%% contains "Ready" or "Unacked" iff that is what it counts. If
+%% neither is present it counts both.
+stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
+ {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged,
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ delta_transient_bytes = DeltaBytes,
+ persistent_bytes = PersistentBytes}) ->
+ S = msg_size(MsgStatus),
+ DeltaTotal = DeltaReady + DeltaUnacked,
+ DeltaRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ DeltaRamReady = case DeltaReady of
+ 1 -> one_if(InRamAfter);
+ -1 -> -one_if(InRamBefore);
+ 0 when ReadyMsgPaged -> DeltaRam;
+ 0 -> 0
+ end,
+ DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + DeltaReady,
+ ram_msg_count = RamReadyCount + DeltaRamReady,
+ persistent_count = PersistentCount + DeltaPersistent,
+ bytes = ReadyBytes + DeltaReady * S,
+ unacked_bytes = UnackedBytes + DeltaUnacked * S,
+ ram_bytes = RamBytes + DeltaRam * S,
+ persistent_bytes = PersistentBytes + DeltaPersistent * S,
+ delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+
+msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
+
+%% first param: AckRequired
+remove(true, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {out_counter = OutCount,
+ index_state = IndexState}) ->
+ %% Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ State1 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+
+ State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1),
+
+ {SeqId, maybe_update_rates(
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState1})};
+
+%% This function body has the same behaviour as remove_queue_entries/3
+%% but instead of removing messages based on a ?QUEUE, this removes
+%% just one message, the one referenced by the MsgStatus provided.
+remove(false, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState}) ->
+ %% Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% Remove from msg_store and queue index, if necessary
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+
+ IndexState2 =
+ case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState1);
+ false -> IndexState1
+ end,
+
+ State1 = stats({-1, 0}, {MsgStatus, none}, 0, State),
+
+ {undefined, maybe_update_rates(
+ State1 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
+
+%% This function exists as a way to improve dropwhile/2
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers and acks, instead of
+%% sending them one by one.
+%%
+%% Instead of removing every message as their are popped from the
+%% queue, it first accumulates them and then removes them by calling
+%% remove_queue_entries/3, since the behaviour of
+%% remove_queue_entries/3 when used with
+%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
+%% calling remove(false, MsgStatus, State).
+%%
+%% remove/3 also updates the out_counter in every call, but here we do
+%% it just once at the end.
+remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+ State2 =
+ remove_queue_entries(
+ QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
+ %% maybe_update_rates/1 is called in remove/2 for every
+ %% message. Since we update out_counter only once, we call it just
+ %% there.
+ {MsgProps, maybe_update_rates(
+ State2 #vqstate {
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% This function exists as a way to improve fetchwhile/4
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers, instead of sending them
+%% one by one.
+%%
+%% Fun is the function passed to fetchwhile/4 that's
+%% applied to every fetched message and used to build the fetchwhile/4
+%% result accumulator FetchAcc.
+fetch_by_predicate(Pred, Fun, FetchAcc,
+ State = #vqstate {
+ index_state = IndexState,
+ out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+
+ {Delivers, FetchAcc1, State2} =
+ process_queue_entries(QAcc, Fun, FetchAcc, State1),
+
+ IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
+
+ {MsgProps, FetchAcc1, maybe_update_rates(
+ State2 #vqstate {
+ index_state = IndexState1,
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% We try to do here the same as what remove(true, State) does but
+%% processing several messages at the same time. The idea is to
+%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
+%% SeqIds instead of one by one, thus process_queue_entries1 will
+%% accumulate the required deliveries, will record_pending_ack for
+%% each message, and will update stats, like remove/2 does.
+%%
+%% For the meaning of Fun and FetchAcc arguments see
+%% fetch_by_predicate/4 above.
+process_queue_entries(Q, Fun, FetchAcc, State) ->
+ ?QUEUE:foldl(fun (MsgStatus, Acc) ->
+ process_queue_entries1(MsgStatus, Fun, Acc)
+ end,
+ {[], FetchAcc, State}, Q).
+
+process_queue_entries1(
+ #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk} = MsgStatus,
+ Fun,
+ {Delivers, FetchAcc, State}) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ State2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State1),
+ {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ Fun(Msg, SeqId, FetchAcc),
+ stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}.
+
+collect_by_predicate(Pred, QAcc, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, QAcc, State1};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
+ State1);
+ false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+%% Helpers for Public API purge/1 function
+%%----------------------------------------------------------------------------
+
+%% The difference between purge_when_pending_acks/1
+%% vs. purge_and_index_reset/1 is that the first one issues a deliver
+%% and an ack to the queue index for every message that's being
+%% removed, while the later just resets the queue index state.
+purge_when_pending_acks(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
+ a(State1).
+
+purge_and_index_reset(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(none), State),
+ a(reset_qi_state(State1)).
+
+%% This function removes messages from each of {q1, q2, q3, q4}.
+%%
+%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
+%% are specially handled by purge_betas_and_deltas/2.
+%%
+%% purge_betas_and_deltas/2 loads messages from the queue index,
+%% filling up q3 and in some cases moving messages form q2 to q3 while
+%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The
+%% messages loaded into q3 are removed by calling
+%% remove_queue_entries/3 until there are no more messages to be read
+%% from the queue index. Messages are read in batches from the queue
+%% index.
+purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
+ State1 = remove_queue_entries(Q4, AfterFun, State),
+
+ State2 = #vqstate {q1 = Q1} =
+ purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
+
+ State3 = remove_queue_entries(Q1, AfterFun, State2),
+
+ a(State3#vqstate{q1 = ?QUEUE:new()}).
+
+reset_qi_state(State = #vqstate{index_state = IndexState}) ->
+ State#vqstate{index_state =
+ rabbit_queue_index:reset_state(IndexState)}.
+
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+is_unconfirmed_empty(#vqstate { unconfirmed = UC }) ->
+ gb_sets:is_empty(UC).
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
+purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
+ State0 = #vqstate { q3 = Q3 } =
+ case Mode of
+ lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State);
+ _ -> State
+ end,
+
+ case ?QUEUE:is_empty(Q3) of
+ true -> State0;
+ false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0),
+ purge_betas_and_deltas(DelsAndAcksFun,
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
+ State1#vqstate{q3 = ?QUEUE:new()}))
+ end.
+
+remove_queue_entries(Q, DelsAndAcksFun,
+ State = #vqstate{msg_store_clients = MSCState}) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {maps:new(), [], [], State}, Q),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ DelsAndAcksFun(Delivers, Acks, State1).
+
+remove_queue_entries1(
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent} = MsgStatus,
+ {MsgIdsByStore, Delivers, Acks, State}) ->
+ {case MsgInStore of
+ true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
+ cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ cons_if(IndexOnDisk, SeqId, Acks),
+ stats({-1, 0}, {MsgStatus, none}, 0, State)}.
+
+process_delivers_and_acks_fun(deliver_and_ack) ->
+ fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
+ IndexState1 =
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State #vqstate { index_state = IndexState1 }
+ end;
+process_delivers_and_acks_fun(_) ->
+ fun (_, _, State) ->
+ State
+ end.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for publishing
+%%----------------------------------------------------------------------------
+
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
+ end,
+ InCount1 = InCount + 1,
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats({1, 0}, {none, MsgStatus1}, 0,
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 });
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC,
+ delta = Delta}) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats(lazy_pub, {lazy, m(MsgStatus1)}, 1,
+ State1#vqstate{ delta = Delta1,
+ next_seq_id = SeqId + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1}).
+
+batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
+ {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4, State)}.
+
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1}, 0,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3};
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1}, 0,
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3}.
+
+batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4,
+ State),
+ {ChPid, Flow, [SeqId | SeqIds], State1}.
+
+maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
+ msg_in_store = true }, State) ->
+ {MsgStatus, State};
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId,
+ is_persistent = IsPersistent },
+ State = #vqstate{ msg_store_clients = MSCState,
+ disk_write_count = Count})
+ when Force orelse IsPersistent ->
+ case persist_to(MsgStatus) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ {MsgStatus#msg_status{msg_in_store = true},
+ State#vqstate{disk_write_count = Count + 1}};
+ queue_index -> {MsgStatus, State}
+ end;
+maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+%% Due to certain optimisations made inside
+%% rabbit_queue_index:pre_publish/7 we need to have two separate
+%% functions for index persistence. This one is only used when paging
+%% during memory pressure. We didn't want to modify
+%% maybe_write_index_to_disk/3 because that function is used in other
+%% places.
+maybe_batch_write_index_to_disk(_Force,
+ MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+maybe_batch_write_index_to_disk(Force,
+ MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate {
+ target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
+ when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:pre_publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
+ TargetRamCount, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState1,
+ disk_write_count = DiskWriteCount1}};
+maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate{target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
+ when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
+ IndexState),
+ IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState2,
+ disk_write_count = DiskWriteCount1}};
+
+maybe_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
+maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
+determine_persist_to(#basic_message{
+ content = #content{properties = Props,
+ properties_bin = PropsBin}},
+ #message_properties{size = BodySize},
+ IndexMaxSize) ->
+ %% The >= is so that you can set the env to 0 and never persist
+ %% to the index.
+ %%
+ %% We want this to be fast, so we avoid size(term_to_binary())
+ %% here, or using the term size estimation from truncate.erl, both
+ %% of which are too slow. So instead, if the message body size
+ %% goes over the limit then we avoid any other checks.
+ %%
+ %% If it doesn't we need to decide if the properties will push
+ %% it past the limit. If we have the encoded properties (usual
+ %% case) we can just check their size. If we don't (message came
+ %% via the direct client), we make a guess based on the number of
+ %% headers.
+ case BodySize >= IndexMaxSize of
+ true -> msg_store;
+ false -> Est = case is_binary(PropsBin) of
+ true -> BodySize + size(PropsBin);
+ false -> #'P_basic'{headers = Hs} = Props,
+ case Hs of
+ undefined -> 0;
+ _ -> length(Hs)
+ end * ?HEADER_GUESS_SIZE + BodySize
+ end,
+ case Est >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end
+ end.
+
+persist_to(#msg_status{persist_to = To}) -> To.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for acks
+%%----------------------------------------------------------------------------
+
+record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
+ ack_in_counter = AckInCount}) ->
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
+ {false, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
+ end,
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
+ ack_in_counter = AckInCount + 1}.
+
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
+ end.
+
+%% First parameter = UpdateStats
+remove_pending_ack(true, SeqId, State) ->
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {MsgStatus, State1} ->
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)}
+ end;
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ case gb_trees:lookup(SeqId, QPA) of
+ {value, V} ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {V, State#vqstate{qi_pending_ack = QPA1}};
+ none ->
+ {none, State}
+ end
+ end
+ end.
+
+purge_pending_ack(KeepPersistent,
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ case KeepPersistent of
+ true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
+ State1;
+ false -> IndexState1 =
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }
+ end.
+
+purge_pending_ack_delete_and_terminate(
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }.
+
+purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
+ {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
+
+%% MsgIdsByStore is an map with two keys:
+%%
+%% true: holds a list of Persistent Message Ids.
+%% false: holds a list of Transient Message Ids.
+%%
+%% When we call maps:to_list/1 we get two sets of msg ids, where
+%% IsPersistent is either true for persistent messages or false for
+%% transient ones. The msg_store_remove/3 function takes this boolean
+%% flag to determine from which store the messages should be removed
+%% from.
+remove_msgs_by_id(MsgIdsByStore, MSCState) ->
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)].
+
+remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
+ case maps:find(false, MsgIdsByStore) of
+ error -> ok;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
+ end.
+
+accumulate_ack_init() -> {[], maps:new(), []}.
+
+accumulate_ack(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
+ case MsgInStore of
+ true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
+ [MsgId | AllMsgIds]}.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing for confirms (aka publisher acks)
+%%----------------------------------------------------------------------------
+
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
+ State #vqstate {
+ msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
+ msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
+ unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
+ confirmed = gb_sets:union(C, MsgIdSet) }.
+
+msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
+
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing for requeue
+%%----------------------------------------------------------------------------
+
+publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}.
+
+publish_beta(MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}.
+
+%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
+ queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
+ Limit, PubFun, State).
+
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
+ Limit, PubFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case ?QUEUE:out(Q) of
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
+ Limit, PubFun, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
+ case msg_from_pending_ack(SeqId, State) of
+ {none, _} ->
+ queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State);
+ {MsgStatus, State1} ->
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, State2)
+ end
+ end;
+queue_merge(SeqIds, Q, Front, MsgIds,
+ _Limit, _PubFun, State) ->
+ {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
+
+delta_merge([], Delta, MsgIds, State) ->
+ {Delta, MsgIds, State};
+delta_merge(SeqIds, Delta, MsgIds, State) ->
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) ->
+ case msg_from_pending_ack(SeqId, State0) of
+ {none, _} ->
+ Acc;
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent } = MsgStatus, State1} ->
+ {_MsgStatus, State2} =
+ maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
+ {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0],
+ stats({1, -1}, {MsgStatus, none}, 1, State2)}
+ end
+ end, {Delta, MsgIds, State}, SeqIds).
+
+%% Mostly opposite of record_pending_ack/2
+msg_from_pending_ack(SeqId, State) ->
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} ->
+ {MsgStatus #msg_status {
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}
+ end.
+
+beta_limit(Q) ->
+ case ?QUEUE:peek(Q) of
+ {value, #msg_status { seq_id = SeqId }} -> SeqId;
+ empty -> undefined
+ end.
+
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case is_msg_in_pending_acks(SeqId, State) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
+ end.
+
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State0) ->
+ {Acc, State0};
+ifold(Fun, Acc, Its0, State0) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State1};
+ {cont, Acc1} ->
+ IndexState0 = State1#vqstate.index_state,
+ {Its1, IndexState1} = inext(It, {Rest, IndexState0}),
+ State2 = State1#vqstate{index_state = IndexState1},
+ ifold(Fun, Acc1, Its1, State2)
+ end.
+
+%%----------------------------------------------------------------------------
+%% Phase changes
+%%----------------------------------------------------------------------------
+
+maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
+ mode = Mode}) ->
+ case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
+ true -> State1 = reduce_memory_use(State),
+ State1#vqstate{memory_reduction_run_count = 0};
+ false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
+ end.
+
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
+reduce_memory_use(State = #vqstate {
+ mode = default,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress } }) ->
+ {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
+ msg_store_credit_disc_bound,
+ ?CREDIT_DISC_BOUND),
+ {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> {false, State};
+ %% Reduce memory of pending acks and alphas. The order is
+ %% determined based on which is growing faster. Whichever
+ %% comes second may very well get a quota of 0 if the
+ %% first manages to push out the max number of messages.
+ A2BChunk ->
+ %% In case there are few messages to be sent to a message store
+ %% and many messages to be embedded to the queue index,
+ %% we should limit the number of messages to be flushed
+ %% to avoid blocking the process.
+ A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> A2BChunk
+ end,
+ Funs = case ((AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress)) of
+ true -> [fun limit_ram_acks/2,
+ fun push_alphas_to_betas/2];
+ false -> [fun push_alphas_to_betas/2,
+ fun limit_ram_acks/2]
+ end,
+ {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {A2BChunkActual, State}, Funs),
+ {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
+ end,
+ Permitted = permitted_beta_count(State1),
+ {NeedResumeB2D, State3} =
+ %% If there are more messages with their queue position held in RAM,
+ %% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
+ %% write their queue position to disk, a.k.a. push_betas_to_deltas
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ Permitted) of
+ B2DChunk when B2DChunk >= IoBatchSize ->
+ %% Same as for alphas to betas. Limit a number of messages
+ %% to be flushed to disk at once to avoid blocking the process.
+ B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
+ true -> CreditDiscBound * 2;
+ false -> B2DChunk
+ end,
+ StateBD = push_betas_to_deltas(B2DChunkActual, State1),
+ {B2DChunk > B2DChunkActual, StateBD};
+ _ ->
+ {false, State1}
+ end,
+ %% We can be blocked by the credit flow, or limited by a batch size,
+ %% or finished with flushing.
+ %% If blocked by the credit flow - the credit grant will resume processing,
+ %% if limited by a batch - the batch continuation message should be sent.
+ %% The continuation message will be prioritised over publishes,
+ %% but not consumptions, so the queue can make progess.
+ Blocked = credit_flow:blocked(),
+ case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
+ %% Credit bump will continue paging
+ {true, _} -> State3;
+ %% Finished with paging
+ {false, false} -> State3;
+ %% Planning next batch
+ {false, true} ->
+ %% We don't want to use self-credit-flow, because it's harder to
+ %% reason about. So the process sends a (prioritised) message to
+ %% itself and sets a waiting_bump value to keep the message box clean
+ maybe_bump_reduce_memory_use(State3)
+ end;
+%% When using lazy queues, there are no alphas, so we don't need to
+%% call push_alphas_to_betas/2.
+reduce_memory_use(State = #vqstate {
+ mode = lazy,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount }) ->
+ State1 = #vqstate { q3 = Q3 } =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> State;
+ S1 -> {_, State2} = limit_ram_acks(S1, State),
+ State2
+ end,
+
+ State3 =
+ case chunk_size(?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ 0 ->
+ State1;
+ S2 ->
+ push_betas_to_deltas(S2, State1)
+ end,
+ garbage_collect(),
+ State3.
+
+maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
+ State;
+maybe_bump_reduce_memory_use(State) ->
+ self() ! bump_reduce_memory_use,
+ State#vqstate{ waiting_bump = true }.
+
+limit_ram_acks(0, State) ->
+ {0, ui(State)};
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
+ true ->
+ {Quota, ui(State)};
+ false ->
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
+ limit_ram_acks(Quota - 1,
+ stats({0, 0}, {MsgStatus, MsgStatus2}, 0,
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
+ end.
+
+permitted_beta_count(#vqstate { len = 0 }) ->
+ infinity;
+permitted_beta_count(#vqstate { mode = lazy,
+ target_ram_count = TargetRamCount}) ->
+ TargetRamCount;
+permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
+ lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
+permitted_beta_count(#vqstate { q1 = Q1,
+ q4 = Q4,
+ target_ram_count = TargetRamCount,
+ len = Len }) ->
+ BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
+ lists:max([rabbit_queue_index:next_segment_boundary(0),
+ BetaDelta - ((BetaDelta * BetaDelta) div
+ (BetaDelta + TargetRamCount))]).
+
+chunk_size(Current, Permitted)
+ when Permitted =:= infinity orelse Permitted >= Current ->
+ 0;
+chunk_size(Current, Permitted) ->
+ Current - Permitted.
+
+fetch_from_q3(State = #vqstate { mode = default,
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} ->
+ {empty, State};
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
+ {loaded, {MsgStatus, State2}}
+ end;
+%% lazy queues
+fetch_from_q3(State = #vqstate { mode = lazy,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} when DeltaCount =:= 0 ->
+ {empty, State};
+ {empty, _Q3} ->
+ fetch_from_q3(maybe_deltas_to_betas(State));
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ {loaded, {MsgStatus, State1}}
+ end.
+
+maybe_deltas_to_betas(State) ->
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ maybe_deltas_to_betas(AfterFun, State).
+
+maybe_deltas_to_betas(_DelsAndAcksFun,
+ State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
+ State;
+maybe_deltas_to_betas(DelsAndAcksFun,
+ State = #vqstate {
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
+ disk_read_count = DiskReadCount,
+ delta_transient_bytes = DeltaTransientBytes,
+ transient_threshold = TransientThreshold }) ->
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ transient = Transient,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaSeqId1 =
+ lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
+ betas_from_index_entries(List, TransientThreshold,
+ DelsAndAcksFun,
+ State #vqstate { index_state = IndexState1 }),
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc },
+ case ?QUEUE:len(Q3a) of
+ 0 ->
+ %% we ignored every message in the segment due to it being
+ %% transient and below the threshold
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
+ State2 #vqstate {
+ delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
+ Q3aLen ->
+ Q3b = ?QUEUE:join(Q3, Q3a),
+ case DeltaCount - Q3aLen of
+ 0 ->
+ %% delta is now empty, but it wasn't before, so
+ %% can now join q2 onto q3
+ State2 #vqstate { q2 = ?QUEUE:new(),
+ delta = ?BLANK_DELTA,
+ q3 = ?QUEUE:join(Q3b, Q2),
+ delta_transient_bytes = 0};
+ N when N > 0 ->
+ Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ transient = Transient - TransientCount,
+ end_seq_id = DeltaSeqIdEnd }),
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3b,
+ delta_transient_bytes = DeltaTransientBytes - TransientBytes }
+ end
+ end.
+
+push_alphas_to_betas(Quota, State) ->
+ {Quota1, State1} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out/1,
+ fun (MsgStatus, Q1a,
+ State0 = #vqstate { q3 = Q3, delta = #delta { count = 0,
+ transient = 0 } }) ->
+ State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
+ (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
+ State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
+ end, Quota, State #vqstate.q1, State),
+ {Quota2, State2} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out_r/1,
+ fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
+ State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
+ end, Quota1, State1 #vqstate.q4, State1),
+ {Quota2, State2}.
+
+push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
+ when Quota =:= 0 orelse
+ TargetRamCount =:= infinity orelse
+ TargetRamCount >= RamMsgCount ->
+ {Quota, ui(State)};
+push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ %% We consume credits from the message_store whenever we need to
+ %% persist a message to disk. See:
+ %% rabbit_variable_queue:msg_store_write/4. So perhaps the
+ %% msg_store is trying to throttle down our queue.
+ case credit_flow:blocked() of
+ true -> {Quota, ui(State)};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, ui(State)};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, 0, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State3)
+ end
+ end.
+
+push_betas_to_deltas(Quota, State = #vqstate { mode = default,
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, PushState),
+ {Q2a, PushState2} = push_betas_to_deltas(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, PushState1),
+ {_, Delta1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a };
+%% In the case of lazy queues we want to page as many messages as
+%% possible from q3.
+push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q3, PushState),
+ {_, Delta1, State1} = PushState1,
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3a }.
+
+
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ case ?QUEUE:is_empty(Q) of
+ true ->
+ {Q, PushState};
+ false ->
+ {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
+ {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
+ Limit = LimitFun(MinSeqId),
+ case MaxSeqId < Limit of
+ true -> {Q, PushState};
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ end
+ end.
+
+push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
+ {Q, {0, Delta, ui(State)}};
+push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ {Q, {Quota, Delta, ui(State)}};
+ {{value, #msg_status { seq_id = SeqId }}, _Qa}
+ when SeqId < Limit ->
+ {Q, {Quota, Delta, ui(State)}};
+ {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
+ {#msg_status { index_on_disk = true,
+ is_persistent = IsPersistent }, State1} =
+ maybe_batch_write_index_to_disk(true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, 1, State1),
+ Delta1 = expand_delta(SeqId, Delta, IsPersistent),
+ push_betas_to_deltas1(Generator, Limit, Qa,
+ {Quota - 1, Delta1, State2})
+ end.
+
+%% Flushes queue index batch caches and updates queue index state.
+ui(#vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount} = State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ State#vqstate{index_state = IndexState1}.
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+-spec multiple_routing_keys() -> 'ok'.
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ MsgId, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
+
+move_messages_to_vhost_store() ->
+ case list_persistent_queues() of
+ [] ->
+ log_upgrade("No durable queues found."
+ " Skipping message store migration"),
+ ok;
+ Queues ->
+ move_messages_to_vhost_store(Queues)
+ end,
+ ok = delete_old_store(),
+ ok = rabbit_queue_index:cleanup_global_recovery_terms().
+
+move_messages_to_vhost_store(Queues) ->
+ log_upgrade("Moving messages to per-vhost message store"),
+ %% Move the queue index for each persistent queue to the new store
+ lists:foreach(
+ fun(Queue) ->
+ QueueName = amqqueue:get_name(Queue),
+ rabbit_queue_index:move_to_per_vhost_stores(QueueName)
+ end,
+ Queues),
+ %% Legacy (global) msg_store may require recovery.
+ %% This upgrade step should only be started
+ %% if we are upgrading from a pre-3.7.0 version.
+ {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues),
+
+ OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
+
+ VHosts = rabbit_vhost:list_names(),
+
+ %% New store should not be recovered.
+ NewMsgStore = start_new_store(VHosts),
+ %% Recovery terms should be started for all vhosts for new store.
+ [ok = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
+
+ MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
+ ?QUEUE_MIGRATION_BATCH_SIZE),
+ in_batches(MigrationBatchSize,
+ {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
+ QueuesWithTerms,
+ "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
+ "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
+
+ log_upgrade("Message store migration finished"),
+ ok = rabbit_sup:stop_child(OldStore),
+ [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
+ ok = stop_new_store(NewMsgStore).
+
+in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
+ in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
+
+in_batches(_, _, _, [], _, _) -> ok;
+in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
+ Length = length(List),
+ {Batch, Tail} = case Size > Length of
+ true -> {List, []};
+ false -> lists:split(Size, List)
+ end,
+ ProcessedLength = (BatchNum - 1) * Size,
+ rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]),
+ {M, F, A} = MFA,
+ Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
+ lists:foreach(fun(Key) ->
+ case rpc:yield(Key) of
+ {badrpc, Err} -> throw(Err);
+ _ -> ok
+ end
+ end,
+ Keys),
+ rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
+ in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
+
+migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
+ RecoveryTerm},
+ OldStore, NewStore) ->
+ log_upgrade_verbose(
+ "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
+ [Name, VHost]),
+ OldStoreClient = get_global_store_client(OldStore),
+ NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
+ %% WARNING: During scan_queue_segments queue index state is being recovered
+ %% and terminated. This can cause side effects!
+ rabbit_queue_index:scan_queue_segments(
+ %% We migrate only persistent messages which are found in message store
+ %% and are not acked yet
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC)
+ when is_binary(MsgId) ->
+ migrate_message(MsgId, OldC, NewStoreClient);
+ (_SeqId, _MsgId, _MsgProps,
+ _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
+ OldC
+ end,
+ OldStoreClient,
+ QueueName),
+ rabbit_msg_store:client_terminate(OldStoreClient),
+ rabbit_msg_store:client_terminate(NewStoreClient),
+ NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),
+ case RecoveryTerm of
+ non_clean_shutdown -> ok;
+ Term when is_list(Term) ->
+ NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm,
+ {persistent_ref, NewClientRef}),
+ rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm)
+ end,
+ log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]),
+ {QueueName, NewClientRef}.
+
+migrate_message(MsgId, OldC, NewC) ->
+ case rabbit_msg_store:read(MsgId, OldC) of
+ {{ok, Msg}, OldC1} ->
+ ok = rabbit_msg_store:write(MsgId, Msg, NewC),
+ OldC1;
+ _ -> OldC
+ end.
+
+get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
+ {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
+ rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
+ fun(_,_) -> ok end, fun() -> ok end).
+
+get_global_store_client(OldStore) ->
+ rabbit_msg_store:client_init(OldStore,
+ rabbit_guid:gen(),
+ fun(_,_) -> ok end,
+ fun() -> ok end).
+
+list_persistent_queues() ->
+ Node = node(),
+ mnesia:async_dirty(
+ fun () ->
+ qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
+ ?amqqueue_is_classic(Q),
+ amqqueue:qnode(Q) == Node,
+ mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []]))
+ end).
+
+read_old_recovery_terms([]) ->
+ {[], [], ?EMPTY_START_FUN_STATE};
+read_old_recovery_terms(Queues) ->
+ QueueNames = [amqqueue:get_name(Q) || Q <- Queues],
+ {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
+ Refs = [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ {lists:zip(QueueNames, AllTerms), Refs, StartFunState}.
+
+run_old_persistent_store(Refs, StartFunState) ->
+ OldStoreName = ?PERSISTENT_MSG_STORE,
+ ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link,
+ [OldStoreName, rabbit_mnesia:dir(),
+ Refs, StartFunState]),
+ OldStoreName.
+
+start_new_store(VHosts) ->
+ %% Ensure vhost supervisor is started, so we can add vhosts to it.
+ lists:map(fun(VHost) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
+ VHostDir,
+ undefined,
+ ?EMPTY_START_FUN_STATE),
+ {VHost, Pid}
+ end,
+ VHosts).
+
+stop_new_store(NewStore) ->
+ lists:foreach(fun({_VHost, StorePid}) ->
+ unlink(StorePid),
+ exit(StorePid, shutdown)
+ end,
+ NewStore),
+ ok.
+
+delete_old_store() ->
+ log_upgrade("Removing the old message store data"),
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
+ %% Delete old transient store as well
+ rabbit_file:recursive_delete(
+ [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
+ ok.
+
+log_upgrade(Msg) ->
+ log_upgrade(Msg, []).
+
+log_upgrade(Msg, Args) ->
+ rabbit_log:info("message_store upgrades: " ++ Msg, Args).
+
+log_upgrade_verbose(Msg) ->
+ log_upgrade_verbose(Msg, []).
+
+log_upgrade_verbose(Msg, Args) ->
+ rabbit_log_upgrade:info(Msg, Args).
+
+maybe_client_terminate(MSCStateP) ->
+ %% Queue might have been asked to stop by the supervisor, it needs a clean
+ %% shutdown in order for the supervising strategy to work - if it reaches max
+ %% restarts might bring the vhost down.
+ try
+ rabbit_msg_store:client_terminate(MSCStateP)
+ catch
+ _:_ ->
+ ok
+ end.