summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /src/rabbit_variable_queue.erl
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl3015
1 files changed, 0 insertions, 3015 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
deleted file mode 100644
index cf6fa4a189..0000000000
--- a/src/rabbit_variable_queue.erl
+++ /dev/null
@@ -1,3015 +0,0 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
-%% License, v. 2.0. If a copy of the MPL was not distributed with this
-%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
-%%
-%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
--module(rabbit_variable_queue).
-
--export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
- purge/1, purge_acks/1,
- publish/6, publish_delivered/5,
- batch_publish/4, batch_publish_delivered/4,
- discard/4, drain_confirmed/1,
- dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
- ackfold/4, fold/3, len/1, is_empty/1, depth/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
- handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
- zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]).
-
--export([start/2, stop/1]).
-
-%% exported for testing only
--export([start_msg_store/3, stop_msg_store/1, init/6]).
-
--export([move_messages_to_vhost_store/0]).
-
--export([migrate_queue/3, migrate_message/3, get_per_vhost_store_client/2,
- get_global_store_client/1, log_upgrade_verbose/1,
- log_upgrade_verbose/2]).
-
--include_lib("stdlib/include/qlc.hrl").
-
--define(QUEUE_MIGRATION_BATCH_SIZE, 100).
--define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}).
-
-%%----------------------------------------------------------------------------
-%% Messages, and their position in the queue, can be in memory or on
-%% disk, or both. Persistent messages will have both message and
-%% position pushed to disk as soon as they arrive; transient messages
-%% can be written to disk (and thus both types can be evicted from
-%% memory) under memory pressure. The question of whether a message is
-%% in RAM and whether it is persistent are orthogonal.
-%%
-%% Messages are persisted using the queue index and the message
-%% store. Normally the queue index holds the position of the message
-%% *within this queue* along with a couple of small bits of metadata,
-%% while the message store holds the message itself (including headers
-%% and other properties).
-%%
-%% However, as an optimisation, small messages can be embedded
-%% directly in the queue index and bypass the message store
-%% altogether.
-%%
-%% Definitions:
-%%
-%% alpha: this is a message where both the message itself, and its
-%% position within the queue are held in RAM
-%%
-%% beta: this is a message where the message itself is only held on
-%% disk (if persisted to the message store) but its position
-%% within the queue is held in RAM.
-%%
-%% gamma: this is a message where the message itself is only held on
-%% disk, but its position is both in RAM and on disk.
-%%
-%% delta: this is a collection of messages, represented by a single
-%% term, where the messages and their position are only held on
-%% disk.
-%%
-%% Note that for persistent messages, the message and its position
-%% within the queue are always held on disk, *in addition* to being in
-%% one of the above classifications.
-%%
-%% Also note that within this code, the term gamma seldom
-%% appears. It's frequently the case that gammas are defined by betas
-%% who have had their queue position recorded on disk.
-%%
-%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
-%% many of these steps are frequently skipped. q1 and q4 only hold
-%% alphas, q2 and q3 hold both betas and gammas. When a message
-%% arrives, its classification is determined. It is then added to the
-%% rightmost appropriate queue.
-%%
-%% If a new message is determined to be a beta or gamma, q1 is
-%% empty. If a new message is determined to be a delta, q1 and q2 are
-%% empty (and actually q4 too).
-%%
-%% When removing messages from a queue, if q4 is empty then q3 is read
-%% directly. If q3 becomes empty then the next segment's worth of
-%% messages from delta are read into q3, reducing the size of
-%% delta. If the queue is non empty, either q4 or q3 contain
-%% entries. It is never permitted for delta to hold all the messages
-%% in the queue.
-%%
-%% The duration indicated to us by the memory_monitor is used to
-%% calculate, given our current ingress and egress rates, how many
-%% messages we should hold in RAM (i.e. as alphas). We track the
-%% ingress and egress rates for both messages and pending acks and
-%% rates for both are considered when calculating the number of
-%% messages to hold in RAM. When we need to push alphas to betas or
-%% betas to gammas, we favour writing out messages that are further
-%% from the head of the queue. This minimises writes to disk, as the
-%% messages closer to the tail of the queue stay in the queue for
-%% longer, thus do not need to be replaced as quickly by sending other
-%% messages to disk.
-%%
-%% Whilst messages are pushed to disk and forgotten from RAM as soon
-%% as requested by a new setting of the queue RAM duration, the
-%% inverse is not true: we only load messages back into RAM as
-%% demanded as the queue is read from. Thus only publishes to the
-%% queue will take up available spare capacity.
-%%
-%% When we report our duration to the memory monitor, we calculate
-%% average ingress and egress rates over the last two samples, and
-%% then calculate our duration based on the sum of the ingress and
-%% egress rates. More than two samples could be used, but it's a
-%% balance between responding quickly enough to changes in
-%% producers/consumers versus ignoring temporary blips. The problem
-%% with temporary blips is that with just a few queues, they can have
-%% substantial impact on the calculation of the average duration and
-%% hence cause unnecessary I/O. Another alternative is to increase the
-%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
-%% seconds. However, that then runs the risk of being too slow to
-%% inform the memory monitor of changes. Thus a 5 second interval,
-%% plus a rolling average over the last two samples seems to work
-%% well in practice.
-%%
-%% The sum of the ingress and egress rates is used because the egress
-%% rate alone is not sufficient. Adding in the ingress rate means that
-%% queues which are being flooded by messages are given more memory,
-%% resulting in them being able to process the messages faster (by
-%% doing less I/O, or at least deferring it) and thus helping keep
-%% their mailboxes empty and thus the queue as a whole is more
-%% responsive. If such a queue also has fast but previously idle
-%% consumers, the consumer can then start to be driven as fast as it
-%% can go, whereas if only egress rate was being used, the incoming
-%% messages may have to be written to disk and then read back in,
-%% resulting in the hard disk being a bottleneck in driving the
-%% consumers. Generally, we want to give Rabbit every chance of
-%% getting rid of messages as fast as possible and remaining
-%% responsive, and using only the egress rate impacts that goal.
-%%
-%% Once the queue has more alphas than the target_ram_count, the
-%% surplus must be converted to betas, if not gammas, if not rolled
-%% into delta. The conditions under which these transitions occur
-%% reflect the conflicting goals of minimising RAM cost per msg, and
-%% minimising CPU cost per msg. Once the msg has become a beta, its
-%% payload is no longer in RAM, thus a read from the msg_store must
-%% occur before the msg can be delivered, but the RAM cost of a beta
-%% is the same as a gamma, so converting a beta to gamma will not free
-%% up any further RAM. To reduce the RAM cost further, the gamma must
-%% be rolled into delta. Whilst recovering a beta or a gamma to an
-%% alpha requires only one disk read (from the msg_store), recovering
-%% a msg from within delta will require two reads (queue_index and
-%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
-%% conflict is between using delta more, which will free up more
-%% memory, but require additional CPU and disk ops, versus using delta
-%% less and gammas and betas more, which will cost more memory, but
-%% require fewer disk ops and less CPU overhead.
-%%
-%% In the case of a persistent msg published to a durable queue, the
-%% msg is immediately written to the msg_store and queue_index. If
-%% then additionally converted from an alpha, it'll immediately go to
-%% a gamma (as it's already in queue_index), and cannot exist as a
-%% beta. Thus a durable queue with a mixture of persistent and
-%% transient msgs in it which has more messages than permitted by the
-%% target_ram_count may contain an interspersed mixture of betas and
-%% gammas in q2 and q3.
-%%
-%% There is then a ratio that controls how many betas and gammas there
-%% can be. This is based on the target_ram_count and thus expresses
-%% the fact that as the number of permitted alphas in the queue falls,
-%% so should the number of betas and gammas fall (i.e. delta
-%% grows). If q2 and q3 contain more than the permitted number of
-%% betas and gammas, then the surplus are forcibly converted to gammas
-%% (as necessary) and then rolled into delta. The ratio is that
-%% delta/(betas+gammas+delta) equals
-%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
-%% the target_ram_count shrinks to 0, so must betas and gammas.
-%%
-%% The conversion of betas to deltas is done if there are at least
-%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small,
-%% otherwise the frequent operations on the queues of q2 and q3 will not be
-%% effectively amortised (switching the direction of queue access defeats
-%% amortisation). Note that there is a natural upper bound due to credit_flow
-%% limits on the alpha to beta conversion.
-%%
-%% The conversion from alphas to betas is chunked due to the
-%% credit_flow limits of the msg_store. This further smooths the
-%% effects of changes to the target_ram_count and ensures the queue
-%% remains responsive even when there is a large amount of IO work to
-%% do. The 'resume' callback is utilised to ensure that conversions
-%% are done as promptly as possible whilst ensuring the queue remains
-%% responsive.
-%%
-%% In the queue we keep track of both messages that are pending
-%% delivery and messages that are pending acks. In the event of a
-%% queue purge, we only need to load qi segments if the queue has
-%% elements in deltas (i.e. it came under significant memory
-%% pressure). In the event of a queue deletion, in addition to the
-%% preceding, by keeping track of pending acks in RAM, we do not need
-%% to search through qi segments looking for messages that are yet to
-%% be acknowledged.
-%%
-%% Pending acks are recorded in memory by storing the message itself.
-%% If the message has been sent to disk, we do not store the message
-%% content. During memory reduction, pending acks containing message
-%% content have that content removed and the corresponding messages
-%% are pushed out to disk.
-%%
-%% Messages from pending acks are returned to q4, q3 and delta during
-%% requeue, based on the limits of seq_id contained in each. Requeued
-%% messages retain their original seq_id, maintaining order
-%% when requeued.
-%%
-%% The order in which alphas are pushed to betas and pending acks
-%% are pushed to disk is determined dynamically. We always prefer to
-%% push messages for the source (alphas or acks) that is growing the
-%% fastest (with growth measured as avg. ingress - avg. egress).
-%%
-%% Notes on Clean Shutdown
-%% (This documents behaviour in variable_queue, queue_index and
-%% msg_store.)
-%%
-%% In order to try to achieve as fast a start-up as possible, if a
-%% clean shutdown occurs, we try to save out state to disk to reduce
-%% work on startup. In the msg_store this takes the form of the
-%% index_module's state, plus the file_summary ets table, and client
-%% refs. In the VQ, this takes the form of the count of persistent
-%% messages in the queue and references into the msg_stores. The
-%% queue_index adds to these terms the details of its segments and
-%% stores the terms in the queue directory.
-%%
-%% Two message stores are used. One is created for persistent messages
-%% to durable queues that must survive restarts, and the other is used
-%% for all other messages that just happen to need to be written to
-%% disk. On start up we can therefore nuke the transient message
-%% store, and be sure that the messages in the persistent store are
-%% all that we need.
-%%
-%% The references to the msg_stores are there so that the msg_store
-%% knows to only trust its saved state if all of the queues it was
-%% previously talking to come up cleanly. Likewise, the queues
-%% themselves (esp queue_index) skips work in init if all the queues
-%% and msg_store were shutdown cleanly. This gives both good speed
-%% improvements and also robustness so that if anything possibly went
-%% wrong in shutdown (or there was subsequent manual tampering), all
-%% messages and queues that can be recovered are recovered, safely.
-%%
-%% To delete transient messages lazily, the variable_queue, on
-%% startup, stores the next_seq_id reported by the queue_index as the
-%% transient_threshold. From that point on, whenever it's reading a
-%% message off disk via the queue_index, if the seq_id is below this
-%% threshold and the message is transient then it drops the message
-%% (the message itself won't exist on disk because it would have been
-%% stored in the transient msg_store which would have had its saved
-%% state nuked on startup). This avoids the expensive operation of
-%% scanning the entire queue on startup in order to delete transient
-%% messages that were only pushed to disk to save memory.
-%%
-%%----------------------------------------------------------------------------
-
--behaviour(rabbit_backing_queue).
-
--record(vqstate,
- { q1,
- q2,
- delta,
- q3,
- q4,
- next_seq_id,
- ram_pending_ack, %% msgs using store, still in RAM
- disk_pending_ack, %% msgs in store, paged out
- qi_pending_ack, %% msgs using qi, *can't* be paged out
- index_state,
- msg_store_clients,
- durable,
- transient_threshold,
- qi_embed_msgs_below,
-
- len, %% w/o unacked
- bytes, %% w/o unacked
- unacked_bytes,
- persistent_count, %% w unacked
- persistent_bytes, %% w unacked
- delta_transient_bytes, %%
-
- target_ram_count,
- ram_msg_count, %% w/o unacked
- ram_msg_count_prev,
- ram_ack_count_prev,
- ram_bytes, %% w unacked
- out_counter,
- in_counter,
- rates,
- msgs_on_disk,
- msg_indices_on_disk,
- unconfirmed,
- confirmed,
- ack_out_counter,
- ack_in_counter,
- %% Unlike the other counters these two do not feed into
- %% #rates{} and get reset
- disk_read_count,
- disk_write_count,
-
- io_batch_size,
-
- %% default queue or lazy queue
- mode,
- %% number of reduce_memory_usage executions, once it
- %% reaches a threshold the queue will manually trigger a runtime GC
- %% see: maybe_execute_gc/1
- memory_reduction_run_count,
- %% Queue data is grouped by VHost. We need to store it
- %% to work with queue index.
- virtual_host,
- waiting_bump = false
- }).
-
--record(rates, { in, out, ack_in, ack_out, timestamp }).
-
--record(msg_status,
- { seq_id,
- msg_id,
- msg,
- is_persistent,
- is_delivered,
- msg_in_store,
- index_on_disk,
- persist_to,
- msg_props
- }).
-
--record(delta,
- { start_seq_id, %% start_seq_id is inclusive
- count,
- transient,
- end_seq_id %% end_seq_id is exclusive
- }).
-
--define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
--define(PERSISTENT_MSG_STORE, msg_store_persistent).
--define(TRANSIENT_MSG_STORE, msg_store_transient).
-
--define(QUEUE, lqueue).
-
--include_lib("rabbit_common/include/rabbit.hrl").
--include_lib("rabbit_common/include/rabbit_framing.hrl").
--include("amqqueue.hrl").
-
-%%----------------------------------------------------------------------------
-
--rabbit_upgrade({multiple_routing_keys, local, []}).
--rabbit_upgrade({move_messages_to_vhost_store, message_store, []}).
-
--type seq_id() :: non_neg_integer().
-
--type rates() :: #rates { in :: float(),
- out :: float(),
- ack_in :: float(),
- ack_out :: float(),
- timestamp :: rabbit_types:timestamp()}.
-
--type delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer(),
- end_seq_id :: non_neg_integer() }.
-
-%% The compiler (rightfully) complains that ack() and state() are
-%% unused. For this reason we duplicate a -spec from
-%% rabbit_backing_queue with the only intent being to remove
-%% warnings. The problem here is that we can't parameterise the BQ
-%% behaviour by these two types as we would like to. We still leave
-%% these here for documentation purposes.
--type ack() :: seq_id().
--type state() :: #vqstate {
- q1 :: ?QUEUE:?QUEUE(),
- q2 :: ?QUEUE:?QUEUE(),
- delta :: delta(),
- q3 :: ?QUEUE:?QUEUE(),
- q4 :: ?QUEUE:?QUEUE(),
- next_seq_id :: seq_id(),
- ram_pending_ack :: gb_trees:tree(),
- disk_pending_ack :: gb_trees:tree(),
- qi_pending_ack :: gb_trees:tree(),
- index_state :: any(),
- msg_store_clients :: 'undefined' | {{any(), binary()},
- {any(), binary()}},
- durable :: boolean(),
- transient_threshold :: non_neg_integer(),
- qi_embed_msgs_below :: non_neg_integer(),
-
- len :: non_neg_integer(),
- bytes :: non_neg_integer(),
- unacked_bytes :: non_neg_integer(),
-
- persistent_count :: non_neg_integer(),
- persistent_bytes :: non_neg_integer(),
-
- target_ram_count :: non_neg_integer() | 'infinity',
- ram_msg_count :: non_neg_integer(),
- ram_msg_count_prev :: non_neg_integer(),
- ram_ack_count_prev :: non_neg_integer(),
- ram_bytes :: non_neg_integer(),
- out_counter :: non_neg_integer(),
- in_counter :: non_neg_integer(),
- rates :: rates(),
- msgs_on_disk :: gb_sets:set(),
- msg_indices_on_disk :: gb_sets:set(),
- unconfirmed :: gb_sets:set(),
- confirmed :: gb_sets:set(),
- ack_out_counter :: non_neg_integer(),
- ack_in_counter :: non_neg_integer(),
- disk_read_count :: non_neg_integer(),
- disk_write_count :: non_neg_integer(),
-
- io_batch_size :: pos_integer(),
- mode :: 'default' | 'lazy',
- memory_reduction_run_count :: non_neg_integer()}.
-
--define(BLANK_DELTA, #delta { start_seq_id = undefined,
- count = 0,
- transient = 0,
- end_seq_id = undefined }).
--define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
- count = 0,
- transient = 0,
- end_seq_id = Z }).
-
--define(MICROS_PER_SECOND, 1000000.0).
-
-%% We're sampling every 5s for RAM duration; a half life that is of
-%% the same order of magnitude is probably about right.
--define(RATE_AVG_HALF_LIFE, 5.0).
-
-%% We will recalculate the #rates{} every time we get asked for our
-%% RAM duration, or every N messages published, whichever is
-%% sooner. We do this since the priority calculations in
-%% rabbit_amqqueue_process need fairly fresh rates.
--define(MSGS_PER_RATE_CALC, 100).
-
-%% we define the garbage collector threshold
-%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection.
-%% see: rabbitmq-server-973 and rabbitmq-server-964
--define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000).
--define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode),
- case get(explicit_gc_run_operation_threshold) of
- undefined ->
- Val = explicit_gc_run_operation_threshold_for_mode(Mode),
- put(explicit_gc_run_operation_threshold, Val),
- Val;
- Val -> Val
- end).
-
-explicit_gc_run_operation_threshold_for_mode(Mode) ->
- {Key, Fallback} = case Mode of
- lazy -> {lazy_queue_explicit_gc_run_operation_threshold,
- ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD};
- _ -> {queue_explicit_gc_run_operation_threshold,
- ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}
- end,
- rabbit_misc:get_env(rabbit, Key, Fallback).
-
-%%----------------------------------------------------------------------------
-%% Public API
-%%----------------------------------------------------------------------------
-
-start(VHost, DurableQueues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues),
- %% Group recovery terms by vhost.
- ClientRefs = [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- start_msg_store(VHost, ClientRefs, StartFunState),
- {ok, AllTerms}.
-
-stop(VHost) ->
- ok = stop_msg_store(VHost),
- ok = rabbit_queue_index:stop(VHost).
-
-start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
- rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
- do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
- ok.
-
-do_start_msg_store(VHost, Type, Refs, StartFunState) ->
- case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
- {ok, _} ->
- rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
- {error, {no_such_vhost, VHost}} = Err ->
- rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n",
- [Type, VHost]),
- exit(Err);
- {error, Error} ->
- rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n",
- [Type, VHost, Error]),
- exit({error, Error})
- end.
-
-abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
-abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
-
-stop_msg_store(VHost) ->
- rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
- rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE),
- ok.
-
-init(Queue, Recover, Callback) ->
- init(
- Queue, Recover, Callback,
- fun (MsgIds, ActionTaken) ->
- msgs_written_to_disk(Callback, MsgIds, ActionTaken)
- end,
- fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
- fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
-
-init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
- QueueName = amqqueue:get_name(Q),
- IsDurable = amqqueue:is_durable(Q),
- IndexState = rabbit_queue_index:init(QueueName,
- MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
- VHost = QueueName#resource.virtual_host,
- init(IsDurable, IndexState, 0, 0, [],
- case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
- MsgOnDiskFun, AsyncCallback, VHost);
- false -> undefined
- end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
- AsyncCallback, VHost), VHost);
-
-%% We can be recovering a transient queue if it crashed
-init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) ->
- QueueName = amqqueue:get_name(Q),
- IsDurable = amqqueue:is_durable(Q),
- {PRef, RecoveryTerms} = process_recovery_terms(Terms),
- VHost = QueueName#resource.virtual_host,
- {PersistentClient, ContainsCheckFun} =
- case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
- MsgOnDiskFun, AsyncCallback,
- VHost),
- {C, fun (MsgId) when is_binary(MsgId) ->
- rabbit_msg_store:contains(MsgId, C);
- (#basic_message{is_persistent = Persistent}) ->
- Persistent
- end};
- false -> {undefined, fun(_MsgId) -> false end}
- end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
- undefined, AsyncCallback,
- VHost),
- {DeltaCount, DeltaBytes, IndexState} =
- rabbit_queue_index:recover(
- QueueName, RecoveryTerms,
- rabbit_vhost_msg_store:successfully_recovered_state(
- VHost,
- ?PERSISTENT_MSG_STORE),
- ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
- init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
- PersistentClient, TransientClient, VHost).
-
-process_recovery_terms(Terms=non_clean_shutdown) ->
- {rabbit_guid:gen(), Terms};
-process_recovery_terms(Terms) ->
- case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:gen(), []};
- PRef -> {PRef, Terms}
- end.
-
-terminate(_Reason, State) ->
- State1 = #vqstate { virtual_host = VHost,
- persistent_count = PCount,
- persistent_bytes = PBytes,
- index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack(true, State),
- PRef = case MSCStateP of
- undefined -> undefined;
- _ -> ok = maybe_client_terminate(MSCStateP),
- rabbit_msg_store:client_ref(MSCStateP)
- end,
- ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- Terms = [{persistent_ref, PRef},
- {persistent_count, PCount},
- {persistent_bytes, PBytes}],
- a(State1#vqstate {
- index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState),
- msg_store_clients = undefined }).
-
-%% the only difference between purge and delete is that delete also
-%% needs to delete everything that's been delivered and not ack'd.
-delete_and_terminate(_Reason, State) ->
- %% Normally when we purge messages we interact with the qi by
- %% issues delivers and acks for every purged message. In this case
- %% we don't need to do that, so we just delete the qi.
- State1 = purge_and_index_reset(State),
- State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack_delete_and_terminate(State1),
- case MSCStateP of
- undefined -> ok;
- _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
- end,
- rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- a(State2 #vqstate { msg_store_clients = undefined }).
-
-delete_crashed(Q) when ?is_amqqueue(Q) ->
- QName = amqqueue:get_name(Q),
- ok = rabbit_queue_index:erase(QName).
-
-purge(State = #vqstate { len = Len }) ->
- case is_pending_ack_empty(State) and is_unconfirmed_empty(State) of
- true ->
- {Len, purge_and_index_reset(State)};
- false ->
- {Len, purge_when_pending_acks(State)}
- end.
-
-purge_acks(State) -> a(purge_pending_ack(false, State)).
-
-publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
- State1 =
- publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
- fun maybe_write_to_disk/4,
- State),
- a(maybe_reduce_memory_use(maybe_update_rates(State1))).
-
-batch_publish(Publishes, ChPid, Flow, State) ->
- {ChPid, Flow, State1} =
- lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
- State2 = ui(State1),
- a(maybe_reduce_memory_use(maybe_update_rates(State2))).
-
-publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
- {SeqId, State1} =
- publish_delivered1(Msg, MsgProps, ChPid, Flow,
- fun maybe_write_to_disk/4,
- State),
- {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}.
-
-batch_publish_delivered(Publishes, ChPid, Flow, State) ->
- {ChPid, Flow, SeqIds, State1} =
- lists:foldl(fun batch_publish_delivered1/2,
- {ChPid, Flow, [], State}, Publishes),
- State2 = ui(State1),
- {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}.
-
-discard(_MsgId, _ChPid, _Flow, State) -> State.
-
-drain_confirmed(State = #vqstate { confirmed = C }) ->
- case gb_sets:is_empty(C) of
- true -> {[], State}; %% common case
- false -> {gb_sets:to_list(C), State #vqstate {
- confirmed = gb_sets:new() }}
- end.
-
-dropwhile(Pred, State) ->
- {MsgProps, State1} =
- remove_by_predicate(Pred, State),
- {MsgProps, a(State1)}.
-
-fetchwhile(Pred, Fun, Acc, State) ->
- {MsgProps, Acc1, State1} =
- fetch_by_predicate(Pred, Fun, Acc, State),
- {MsgProps, Acc1, a(State1)}.
-
-fetch(AckRequired, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {empty, a(State1)};
- {{value, MsgStatus}, State1} ->
- %% it is possible that the message wasn't read from disk
- %% at this point, so read it in.
- {Msg, State2} = read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
- {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
- end.
-
-drop(AckRequired, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {empty, a(State1)};
- {{value, MsgStatus}, State1} ->
- {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
- end.
-
-%% Duplicated from rabbit_backing_queue
--spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
-
-ack([], State) ->
- {[], State};
-%% optimisation: this head is essentially a partial evaluation of the
-%% general case below, for the single-ack case.
-ack([SeqId], State) ->
- case remove_pending_ack(true, SeqId, State) of
- {none, _} ->
- {[], State};
- {#msg_status { msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} ->
- IndexState1 = case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState);
- false -> IndexState
- end,
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
- {[MsgId],
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + 1 })}
- end;
-ack(AckTags, State) ->
- {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2}) ->
- case remove_pending_ack(true, SeqId, State2) of
- {none, _} ->
- {Acc, State2};
- {MsgStatus, State3} ->
- {accumulate_ack(MsgStatus, Acc), State3}
- end
- end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- {lists:reverse(AllMsgIds),
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + length(AckTags) })}.
-
-requeue(AckTags, #vqstate { mode = default,
- delta = Delta,
- q3 = Q3,
- q4 = Q4,
- in_counter = InCounter,
- len = Len } = State) ->
- {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
- beta_limit(Q3),
- fun publish_alpha/2, State),
- {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
- delta_limit(Delta),
- fun publish_beta/2, State1),
- {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
- State2),
- MsgCount = length(MsgIds2),
- {MsgIds2, a(maybe_reduce_memory_use(
- maybe_update_rates(ui(
- State3 #vqstate { delta = Delta1,
- q3 = Q3a,
- q4 = Q4a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount }))))};
-requeue(AckTags, #vqstate { mode = lazy,
- delta = Delta,
- q3 = Q3,
- in_counter = InCounter,
- len = Len } = State) ->
- {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
- delta_limit(Delta),
- fun publish_beta/2, State),
- {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
- State1),
- MsgCount = length(MsgIds1),
- {MsgIds1, a(maybe_reduce_memory_use(
- maybe_update_rates(ui(
- State2 #vqstate { delta = Delta1,
- q3 = Q3a,
- in_counter = InCounter + MsgCount,
- len = Len + MsgCount }))))}.
-
-ackfold(MsgFun, Acc, State, AckTags) ->
- {AccN, StateN} =
- lists:foldl(fun(SeqId, {Acc0, State0}) ->
- MsgStatus = lookup_pending_ack(SeqId, State0),
- {Msg, State1} = read_msg(MsgStatus, State0),
- {MsgFun(Msg, SeqId, Acc0), State1}
- end, {Acc, State}, AckTags),
- {AccN, a(StateN)}.
-
-fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
- {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
- [msg_iterator(State),
- disk_ack_iterator(State),
- ram_ack_iterator(State),
- qi_ack_iterator(State)]),
- ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
-
-len(#vqstate { len = Len }) -> Len.
-
-is_empty(State) -> 0 == len(State).
-
-depth(State) ->
- len(State) + count_pending_acks(State).
-
-set_ram_duration_target(
- DurationTarget, State = #vqstate {
- rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate },
- target_ram_count = TargetRamCount }) ->
- Rate =
- AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
- TargetRamCount1 =
- case DurationTarget of
- infinity -> infinity;
- _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
- end,
- State1 = State #vqstate { target_ram_count = TargetRamCount1 },
- a(case TargetRamCount1 == infinity orelse
- (TargetRamCount =/= infinity andalso
- TargetRamCount1 >= TargetRamCount) of
- true -> State1;
- false -> reduce_memory_use(State1)
- end).
-
-maybe_update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount })
- when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
- update_rates(State);
-maybe_update_rates(State) ->
- State.
-
-update_rates(State = #vqstate{ in_counter = InCount,
- out_counter = OutCount,
- ack_in_counter = AckInCount,
- ack_out_counter = AckOutCount,
- rates = #rates{ in = InRate,
- out = OutRate,
- ack_in = AckInRate,
- ack_out = AckOutRate,
- timestamp = TS }}) ->
- Now = erlang:monotonic_time(),
-
- Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
- out = update_rate(Now, TS, OutCount, OutRate),
- ack_in = update_rate(Now, TS, AckInCount, AckInRate),
- ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
- timestamp = Now },
-
- State#vqstate{ in_counter = 0,
- out_counter = 0,
- ack_in_counter = 0,
- ack_out_counter = 0,
- rates = Rates }.
-
-update_rate(Now, TS, Count, Rate) ->
- Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) /
- ?MICROS_PER_SECOND,
- if
- Time == 0 -> Rate;
- true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE,
- Count / Time, Rate)
- end.
-
-ram_duration(State) ->
- State1 = #vqstate { rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate },
- ram_msg_count = RamMsgCount,
- ram_msg_count_prev = RamMsgCountPrev,
- ram_pending_ack = RPA,
- qi_pending_ack = QPA,
- ram_ack_count_prev = RamAckCountPrev } =
- update_rates(State),
-
- RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
-
- Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case lists:all(fun (X) -> X < 0.01 end,
- [AvgEgressRate, AvgIngressRate,
- AvgAckEgressRate, AvgAckIngressRate]) of
- true -> infinity;
- false -> (RamMsgCountPrev + RamMsgCount +
- RamAckCount + RamAckCountPrev) /
- (4 * (AvgEgressRate + AvgIngressRate +
- AvgAckEgressRate + AvgAckIngressRate))
- end,
-
- {Duration, State1}.
-
-needs_timeout(#vqstate { index_state = IndexState }) ->
- case rabbit_queue_index:needs_sync(IndexState) of
- confirms -> timed;
- other -> idle;
- false -> false
- end.
-
-timeout(State = #vqstate { index_state = IndexState }) ->
- State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
-
-handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
- State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-
-handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) ->
- State#vqstate{ waiting_bump = false };
-handle_info(bump_reduce_memory_use, State) ->
- State.
-
-resume(State) -> a(reduce_memory_use(State)).
-
-msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate } }) ->
- {AvgIngressRate, AvgEgressRate}.
-
-info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
- RamMsgCount;
-info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
- qi_pending_ack = QPA}) ->
- gb_trees:size(RPA) + gb_trees:size(QPA);
-info(messages_ram, State) ->
- info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
-info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
- PersistentCount;
-info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) ->
- Count;
-info(message_bytes, #vqstate{bytes = Bytes,
- unacked_bytes = UBytes}) ->
- Bytes + UBytes;
-info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
- Bytes;
-info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
- UBytes;
-info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
- RamBytes;
-info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
- PersistentBytes;
-info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) ->
- PagedOutBytes;
-info(head_message_timestamp, #vqstate{
- q3 = Q3,
- q4 = Q4,
- ram_pending_ack = RPA,
- qi_pending_ack = QPA}) ->
- head_message_timestamp(Q3, Q4, RPA, QPA);
-info(disk_reads, #vqstate{disk_read_count = Count}) ->
- Count;
-info(disk_writes, #vqstate{disk_write_count = Count}) ->
- Count;
-info(backing_queue_status, #vqstate {
- q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = Mode,
- len = Len,
- target_ram_count = TargetRamCount,
- next_seq_id = NextSeqId,
- rates = #rates { in = AvgIngressRate,
- out = AvgEgressRate,
- ack_in = AvgAckIngressRate,
- ack_out = AvgAckEgressRate }}) ->
-
- [ {mode , Mode},
- {q1 , ?QUEUE:len(Q1)},
- {q2 , ?QUEUE:len(Q2)},
- {delta , Delta},
- {q3 , ?QUEUE:len(Q3)},
- {q4 , ?QUEUE:len(Q4)},
- {len , Len},
- {target_ram_count , TargetRamCount},
- {next_seq_id , NextSeqId},
- {avg_ingress_rate , AvgIngressRate},
- {avg_egress_rate , AvgEgressRate},
- {avg_ack_ingress_rate, AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ];
-info(_, _) ->
- ''.
-
-invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
-invoke( _, _, State) -> State.
-
-is_duplicate(_Msg, State) -> {false, State}.
-
-set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
- State;
-set_queue_mode(lazy, State = #vqstate {
- target_ram_count = TargetRamCount }) ->
- %% To become a lazy queue we need to page everything to disk first.
- State1 = convert_to_lazy(State),
- %% restore the original target_ram_count
- a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
-set_queue_mode(default, State) ->
- %% becoming a default queue means loading messages from disk like
- %% when a queue is recovered.
- a(maybe_deltas_to_betas(State #vqstate { mode = default }));
-set_queue_mode(_, State) ->
- State.
-
-zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
- lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
- [{Id, AckTag} | Acc]
- end, Accumulator, lists:zip(Msgs, AckTags)).
-
-convert_to_lazy(State) ->
- State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
- set_ram_duration_target(0, State),
- case Delta#delta.count + ?QUEUE:len(Q3) == Len of
- true ->
- State1;
- false ->
- %% When pushing messages to disk, we might have been
- %% blocked by the msg_store, so we need to see if we have
- %% to wait for more credit, and then keep paging messages.
- %%
- %% The amqqueue_process could have taken care of this, but
- %% between the time it receives the bump_credit msg and
- %% calls BQ:resume to keep paging messages to disk, some
- %% other request may arrive to the BQ which at this moment
- %% is not in a proper state for a lazy BQ (unless all
- %% messages have been paged to disk already).
- wait_for_msg_store_credit(),
- convert_to_lazy(resume(State1))
- end.
-
-wait_for_msg_store_credit() ->
- case credit_flow:blocked() of
- true -> receive
- {bump_credit, Msg} ->
- credit_flow:handle_bump_msg(Msg)
- end;
- false -> ok
- end.
-
-%% Get the Timestamp property of the first msg, if present. This is
-%% the one with the oldest timestamp among the heads of the pending
-%% acks and unread queues. We can't check disk_pending_acks as these
-%% are paged out - we assume some will soon be paged in rather than
-%% forcing it to happen. Pending ack msgs are included as they are
-%% regarded as unprocessed until acked, this also prevents the result
-%% apparently oscillating during repeated rejects. Q3 is only checked
-%% when Q4 is empty as any Q4 msg will be earlier.
-head_message_timestamp(Q3, Q4, RPA, QPA) ->
- HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
- HeadMsgStatus <-
- [ get_qs_head([Q4, Q3]),
- get_pa_head(RPA),
- get_pa_head(QPA) ],
- HeadMsgStatus /= undefined,
- HeadMsgStatus#msg_status.msg /= undefined ],
-
- Timestamps =
- [Timestamp || HeadMsg <- HeadMsgs,
- Timestamp <- [rabbit_basic:extract_timestamp(
- HeadMsg#basic_message.content)],
- Timestamp /= undefined
- ],
-
- case Timestamps == [] of
- true -> '';
- false -> lists:min(Timestamps)
- end.
-
-get_qs_head(Qs) ->
- catch lists:foldl(
- fun (Q, Acc) ->
- case get_q_head(Q) of
- undefined -> Acc;
- Val -> throw(Val)
- end
- end, undefined, Qs).
-
-get_q_head(Q) ->
- get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
-
-get_pa_head(PA) ->
- get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
-
-get_collection_head(Col, IsEmpty, GetVal) ->
- case IsEmpty(Col) of
- false ->
- {_, MsgStatus} = GetVal(Col),
- MsgStatus;
- true -> undefined
- end.
-
-%%----------------------------------------------------------------------------
-%% Minor helpers
-%%----------------------------------------------------------------------------
-a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = default,
- len = Len,
- bytes = Bytes,
- unacked_bytes = UnackedBytes,
- persistent_count = PersistentCount,
- persistent_bytes = PersistentBytes,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes}) ->
- E1 = ?QUEUE:is_empty(Q1),
- E2 = ?QUEUE:is_empty(Q2),
- ED = Delta#delta.count == 0,
- E3 = ?QUEUE:is_empty(Q3),
- E4 = ?QUEUE:is_empty(Q4),
- LZ = Len == 0,
-
- %% if q1 has messages then q3 cannot be empty. See publish/6.
- true = E1 or not E3,
- %% if q2 has messages then we have messages in delta (paged to
- %% disk). See push_alphas_to_betas/2.
- true = E2 or not ED,
- %% if delta has messages then q3 cannot be empty. This is enforced
- %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages
- %% are always kept on RAM.
- true = ED or not E3,
- %% if the queue length is 0, then q3 and q4 must be empty.
- true = LZ == (E3 and E4),
-
- true = Len >= 0,
- true = Bytes >= 0,
- true = UnackedBytes >= 0,
- true = PersistentCount >= 0,
- true = PersistentBytes >= 0,
- true = RamMsgCount >= 0,
- true = RamMsgCount =< Len,
- true = RamBytes >= 0,
- true = RamBytes =< Bytes + UnackedBytes,
-
- State;
-a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- mode = lazy,
- len = Len,
- bytes = Bytes,
- unacked_bytes = UnackedBytes,
- persistent_count = PersistentCount,
- persistent_bytes = PersistentBytes,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes}) ->
- E1 = ?QUEUE:is_empty(Q1),
- E2 = ?QUEUE:is_empty(Q2),
- ED = Delta#delta.count == 0,
- E3 = ?QUEUE:is_empty(Q3),
- E4 = ?QUEUE:is_empty(Q4),
- LZ = Len == 0,
- L3 = ?QUEUE:len(Q3),
-
- %% q1 must always be empty, since q1 only gets messages during
- %% publish, but for lazy queues messages go straight to delta.
- true = E1,
-
- %% q2 only gets messages from q1 when push_alphas_to_betas is
- %% called for a non empty delta, which won't be the case for a
- %% lazy queue. This means q2 must always be empty.
- true = E2,
-
- %% q4 must always be empty, since q1 only gets messages during
- %% publish, but for lazy queues messages go straight to delta.
- true = E4,
-
- %% if the queue is empty, then delta is empty and q3 is empty.
- true = LZ == (ED and E3),
-
- %% There should be no messages in q1, q2, and q4
- true = Delta#delta.count + L3 == Len,
-
- true = Len >= 0,
- true = Bytes >= 0,
- true = UnackedBytes >= 0,
- true = PersistentCount >= 0,
- true = PersistentBytes >= 0,
- true = RamMsgCount >= 0,
- true = RamMsgCount =< Len,
- true = RamBytes >= 0,
- true = RamBytes =< Bytes + UnackedBytes,
-
- State.
-
-d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
- when Start + Count =< End ->
- Delta.
-
-m(MsgStatus = #msg_status { is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk }) ->
- true = (not IsPersistent) or IndexOnDisk,
- true = msg_in_ram(MsgStatus) or MsgInStore,
- MsgStatus.
-
-one_if(true ) -> 1;
-one_if(false) -> 0.
-
-cons_if(true, E, L) -> [E | L];
-cons_if(false, _E, L) -> L.
-
-gb_sets_maybe_insert(false, _Val, Set) -> Set;
-gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-
-msg_status(IsPersistent, IsDelivered, SeqId,
- Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
- #msg_status{seq_id = SeqId,
- msg_id = MsgId,
- msg = Msg,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_in_store = false,
- index_on_disk = false,
- persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize),
- msg_props = MsgProps}.
-
-beta_msg_status({Msg = #basic_message{id = MsgId},
- SeqId, MsgProps, IsPersistent, IsDelivered}) ->
- MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = Msg,
- persist_to = queue_index,
- msg_in_store = false};
-
-beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
- MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
- MS0#msg_status{msg_id = MsgId,
- msg = undefined,
- persist_to = msg_store,
- msg_in_store = true}.
-
-beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
- #msg_status{seq_id = SeqId,
- msg = undefined,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = true,
- msg_props = MsgProps}.
-
-trim_msg_status(MsgStatus) ->
- case persist_to(MsgStatus) of
- msg_store -> MsgStatus#msg_status{msg = undefined};
- queue_index -> MsgStatus
- end.
-
-with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
- {Result, MSCStateP1} = Fun(MSCStateP),
- {Result, {MSCStateP1, MSCStateT}};
-with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
- {Result, MSCStateT1} = Fun(MSCStateT),
- {Result, {MSCStateP, MSCStateT1}}.
-
-with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
- {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
- fun (MSCState1) ->
- {Fun(MSCState1), MSCState1}
- end),
- Res.
-
-msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
- msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
- Callback, VHost).
-
-msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
- rabbit_vhost_msg_store:client_init(VHost, MsgStore,
- Ref, MsgOnDiskFun,
- fun () ->
- Callback(?MODULE, CloseFDsFun)
- end).
-
-msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
- end).
-
-msg_store_read(MSCState, IsPersistent, MsgId) ->
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) ->
- rabbit_msg_store:read(MsgId, MSCState1)
- end).
-
-msg_store_remove(MSCState, IsPersistent, MsgIds) ->
- with_immutable_msg_store_state(
- MSCState, IsPersistent,
- fun (MCSState1) ->
- rabbit_msg_store:remove(MsgIds, MCSState1)
- end).
-
-msg_store_close_fds(MSCState, IsPersistent) ->
- with_msg_store_state(
- MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
-
-msg_store_close_fds_fun(IsPersistent) ->
- fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
- {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
- State #vqstate { msg_store_clients = MSCState1 }
- end.
-
-maybe_write_delivered(false, _SeqId, IndexState) ->
- IndexState;
-maybe_write_delivered(true, SeqId, IndexState) ->
- rabbit_queue_index:deliver([SeqId], IndexState).
-
-betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
- {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} =
- lists:foldr(
- fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
- {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) ->
- case SeqId < TransientThreshold andalso not IsPersistent of
- true -> {Filtered1,
- cons_if(not IsDelivered, SeqId, Delivers1),
- [SeqId | Acks1], RRC, RB, TC, TB};
- false -> MsgStatus = m(beta_msg_status(M)),
- HaveMsg = msg_in_ram(MsgStatus),
- Size = msg_size(MsgStatus),
- case is_msg_in_pending_acks(SeqId, State) of
- false -> {?QUEUE:in_r(MsgStatus, Filtered1),
- Delivers1, Acks1,
- RRC + one_if(HaveMsg),
- RB + one_if(HaveMsg) * Size,
- TC + one_if(not IsPersistent),
- TB + one_if(not IsPersistent) * Size};
- true -> Acc %% [0]
- end
- end
- end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State),
- TransientCount, TransientBytes}.
-%% [0] We don't increase RamBytes here, even though it pertains to
-%% unacked messages too, since if HaveMsg then the message must have
-%% been stored in the QI, thus the message must have been in
-%% qi_pending_ack, thus it must already have been in RAM.
-
-is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- (gb_trees:is_defined(SeqId, RPA) orelse
- gb_trees:is_defined(SeqId, DPA) orelse
- gb_trees:is_defined(SeqId, QPA)).
-
-expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) ->
- d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1,
- transient = one_if(not IsPersistent)});
-expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
- count = Count,
- transient = Transient } = Delta,
- IsPersistent )
- when SeqId < StartSeqId ->
- d(Delta #delta { start_seq_id = SeqId, count = Count + 1,
- transient = Transient + one_if(not IsPersistent)});
-expand_delta(SeqId, #delta { count = Count,
- end_seq_id = EndSeqId,
- transient = Transient } = Delta,
- IsPersistent)
- when SeqId >= EndSeqId ->
- d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1,
- transient = Transient + one_if(not IsPersistent)});
-expand_delta(_SeqId, #delta { count = Count,
- transient = Transient } = Delta,
- IsPersistent ) ->
- d(Delta #delta { count = Count + 1,
- transient = Transient + one_if(not IsPersistent) }).
-
-%%----------------------------------------------------------------------------
-%% Internal major helpers for Public API
-%%----------------------------------------------------------------------------
-
-init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
- PersistentClient, TransientClient, VHost) ->
- {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
-
- {DeltaCount1, DeltaBytes1} =
- case Terms of
- non_clean_shutdown -> {DeltaCount, DeltaBytes};
- _ -> {proplists:get_value(persistent_count,
- Terms, DeltaCount),
- proplists:get_value(persistent_bytes,
- Terms, DeltaBytes)}
- end,
- Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
- true -> ?BLANK_DELTA;
- false -> d(#delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- transient = 0,
- end_seq_id = NextSeqId })
- end,
- Now = erlang:monotonic_time(),
- IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
- ?IO_BATCH_SIZE),
-
- {ok, IndexMaxSize} = application:get_env(
- rabbit, queue_index_embed_msgs_below),
- State = #vqstate {
- q1 = ?QUEUE:new(),
- q2 = ?QUEUE:new(),
- delta = Delta,
- q3 = ?QUEUE:new(),
- q4 = ?QUEUE:new(),
- next_seq_id = NextSeqId,
- ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty(),
- qi_pending_ack = gb_trees:empty(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- durable = IsDurable,
- transient_threshold = NextSeqId,
- qi_embed_msgs_below = IndexMaxSize,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
- bytes = DeltaBytes1,
- persistent_bytes = DeltaBytes1,
- delta_transient_bytes = 0,
-
- target_ram_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_ack_count_prev = 0,
- ram_bytes = 0,
- unacked_bytes = 0,
- out_counter = 0,
- in_counter = 0,
- rates = blank_rates(Now),
- msgs_on_disk = gb_sets:new(),
- msg_indices_on_disk = gb_sets:new(),
- unconfirmed = gb_sets:new(),
- confirmed = gb_sets:new(),
- ack_out_counter = 0,
- ack_in_counter = 0,
- disk_read_count = 0,
- disk_write_count = 0,
-
- io_batch_size = IoBatchSize,
-
- mode = default,
- memory_reduction_run_count = 0,
- virtual_host = VHost},
- a(maybe_deltas_to_betas(State)).
-
-blank_rates(Now) ->
- #rates { in = 0.0,
- out = 0.0,
- ack_in = 0.0,
- ack_out = 0.0,
- timestamp = Now}.
-
-in_r(MsgStatus = #msg_status { msg = undefined },
- State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
- case ?QUEUE:is_empty(Q4) of
- true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
- false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status{msg = Msg},
- stats(ready0, {MsgStatus, MsgStatus1}, 0,
- State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
- end;
-in_r(MsgStatus,
- State = #vqstate { mode = default, q4 = Q4 }) ->
- State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
-%% lazy queues
-in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent },
- State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
- case ?QUEUE:is_empty(Q3) of
- true ->
- {_MsgStatus1, State1} =
- maybe_write_to_disk(true, true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, 1, State1),
- Delta1 = expand_delta(SeqId, Delta, IsPersistent),
- State2 #vqstate{ delta = Delta1};
- false ->
- State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
- end.
-
-queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
- case ?QUEUE:out(Q4) of
- {empty, _Q4} ->
- case fetch_from_q3(State) of
- {empty, _State1} = Result -> Result;
- {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
- end;
- {{value, MsgStatus}, Q4a} ->
- {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
- end;
-%% lazy queues
-queue_out(State = #vqstate { mode = lazy }) ->
- case fetch_from_q3(State) of
- {empty, _State1} = Result -> Result;
- {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
- end.
-
-read_msg(#msg_status{msg = undefined,
- msg_id = MsgId,
- is_persistent = IsPersistent}, State) ->
- read_msg(MsgId, IsPersistent, State);
-read_msg(#msg_status{msg = Msg}, State) ->
- {Msg, State}.
-
-read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
- disk_read_count = Count}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- {Msg, State #vqstate {msg_store_clients = MSCState1,
- disk_read_count = Count + 1}}.
-
-stats(Signs, Statuses, DeltaPaged, State) ->
- stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State).
-
-expand_signs(ready0) -> {0, 0, true};
-expand_signs(lazy_pub) -> {1, 0, true};
-expand_signs({A, B}) -> {A, B, false}.
-
-expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
-expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
-expand_statuses({lazy, A}) -> {false , false, A};
-expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
-
-%% In this function at least, we are religious: the variable name
-%% contains "Ready" or "Unacked" iff that is what it counts. If
-%% neither is present it counts both.
-stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
- {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged,
- State = #vqstate{len = ReadyCount,
- bytes = ReadyBytes,
- ram_msg_count = RamReadyCount,
- persistent_count = PersistentCount,
- unacked_bytes = UnackedBytes,
- ram_bytes = RamBytes,
- delta_transient_bytes = DeltaBytes,
- persistent_bytes = PersistentBytes}) ->
- S = msg_size(MsgStatus),
- DeltaTotal = DeltaReady + DeltaUnacked,
- DeltaRam = case {InRamBefore, InRamAfter} of
- {false, false} -> 0;
- {false, true} -> 1;
- {true, false} -> -1;
- {true, true} -> 0
- end,
- DeltaRamReady = case DeltaReady of
- 1 -> one_if(InRamAfter);
- -1 -> -one_if(InRamBefore);
- 0 when ReadyMsgPaged -> DeltaRam;
- 0 -> 0
- end,
- DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
- State#vqstate{len = ReadyCount + DeltaReady,
- ram_msg_count = RamReadyCount + DeltaRamReady,
- persistent_count = PersistentCount + DeltaPersistent,
- bytes = ReadyBytes + DeltaReady * S,
- unacked_bytes = UnackedBytes + DeltaUnacked * S,
- ram_bytes = RamBytes + DeltaRam * S,
- persistent_bytes = PersistentBytes + DeltaPersistent * S,
- delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}.
-
-msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
-
-msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
-
-%% first param: AckRequired
-remove(true, MsgStatus = #msg_status {
- seq_id = SeqId,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk },
- State = #vqstate {out_counter = OutCount,
- index_state = IndexState}) ->
- %% Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- State1 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
-
- State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1),
-
- {SeqId, maybe_update_rates(
- State2 #vqstate {out_counter = OutCount + 1,
- index_state = IndexState1})};
-
-%% This function body has the same behaviour as remove_queue_entries/3
-%% but instead of removing messages based on a ?QUEUE, this removes
-%% just one message, the one referenced by the MsgStatus provided.
-remove(false, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State = #vqstate {out_counter = OutCount,
- index_state = IndexState,
- msg_store_clients = MSCState}) ->
- %% Mark it delivered if necessary
- IndexState1 = maybe_write_delivered(
- IndexOnDisk andalso not IsDelivered,
- SeqId, IndexState),
-
- %% Remove from msg_store and queue index, if necessary
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
-
- IndexState2 =
- case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState1);
- false -> IndexState1
- end,
-
- State1 = stats({-1, 0}, {MsgStatus, none}, 0, State),
-
- {undefined, maybe_update_rates(
- State1 #vqstate {out_counter = OutCount + 1,
- index_state = IndexState2})}.
-
-%% This function exists as a way to improve dropwhile/2
-%% performance. The idea of having this function is to optimise calls
-%% to rabbit_queue_index by batching delivers and acks, instead of
-%% sending them one by one.
-%%
-%% Instead of removing every message as their are popped from the
-%% queue, it first accumulates them and then removes them by calling
-%% remove_queue_entries/3, since the behaviour of
-%% remove_queue_entries/3 when used with
-%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
-%% calling remove(false, MsgStatus, State).
-%%
-%% remove/3 also updates the out_counter in every call, but here we do
-%% it just once at the end.
-remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
- {MsgProps, QAcc, State1} =
- collect_by_predicate(Pred, ?QUEUE:new(), State),
- State2 =
- remove_queue_entries(
- QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
- %% maybe_update_rates/1 is called in remove/2 for every
- %% message. Since we update out_counter only once, we call it just
- %% there.
- {MsgProps, maybe_update_rates(
- State2 #vqstate {
- out_counter = OutCount + ?QUEUE:len(QAcc)})}.
-
-%% This function exists as a way to improve fetchwhile/4
-%% performance. The idea of having this function is to optimise calls
-%% to rabbit_queue_index by batching delivers, instead of sending them
-%% one by one.
-%%
-%% Fun is the function passed to fetchwhile/4 that's
-%% applied to every fetched message and used to build the fetchwhile/4
-%% result accumulator FetchAcc.
-fetch_by_predicate(Pred, Fun, FetchAcc,
- State = #vqstate {
- index_state = IndexState,
- out_counter = OutCount}) ->
- {MsgProps, QAcc, State1} =
- collect_by_predicate(Pred, ?QUEUE:new(), State),
-
- {Delivers, FetchAcc1, State2} =
- process_queue_entries(QAcc, Fun, FetchAcc, State1),
-
- IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
-
- {MsgProps, FetchAcc1, maybe_update_rates(
- State2 #vqstate {
- index_state = IndexState1,
- out_counter = OutCount + ?QUEUE:len(QAcc)})}.
-
-%% We try to do here the same as what remove(true, State) does but
-%% processing several messages at the same time. The idea is to
-%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
-%% SeqIds instead of one by one, thus process_queue_entries1 will
-%% accumulate the required deliveries, will record_pending_ack for
-%% each message, and will update stats, like remove/2 does.
-%%
-%% For the meaning of Fun and FetchAcc arguments see
-%% fetch_by_predicate/4 above.
-process_queue_entries(Q, Fun, FetchAcc, State) ->
- ?QUEUE:foldl(fun (MsgStatus, Acc) ->
- process_queue_entries1(MsgStatus, Fun, Acc)
- end,
- {[], FetchAcc, State}, Q).
-
-process_queue_entries1(
- #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk} = MsgStatus,
- Fun,
- {Delivers, FetchAcc, State}) ->
- {Msg, State1} = read_msg(MsgStatus, State),
- State2 = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State1),
- {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- Fun(Msg, SeqId, FetchAcc),
- stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}.
-
-collect_by_predicate(Pred, QAcc, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {undefined, QAcc, State1};
- {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
- State1);
- false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
- end
- end.
-
-%%----------------------------------------------------------------------------
-%% Helpers for Public API purge/1 function
-%%----------------------------------------------------------------------------
-
-%% The difference between purge_when_pending_acks/1
-%% vs. purge_and_index_reset/1 is that the first one issues a deliver
-%% and an ack to the queue index for every message that's being
-%% removed, while the later just resets the queue index state.
-purge_when_pending_acks(State) ->
- State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
- a(State1).
-
-purge_and_index_reset(State) ->
- State1 = purge1(process_delivers_and_acks_fun(none), State),
- a(reset_qi_state(State1)).
-
-%% This function removes messages from each of {q1, q2, q3, q4}.
-%%
-%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
-%% are specially handled by purge_betas_and_deltas/2.
-%%
-%% purge_betas_and_deltas/2 loads messages from the queue index,
-%% filling up q3 and in some cases moving messages form q2 to q3 while
-%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The
-%% messages loaded into q3 are removed by calling
-%% remove_queue_entries/3 until there are no more messages to be read
-%% from the queue index. Messages are read in batches from the queue
-%% index.
-purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
- State1 = remove_queue_entries(Q4, AfterFun, State),
-
- State2 = #vqstate {q1 = Q1} =
- purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
-
- State3 = remove_queue_entries(Q1, AfterFun, State2),
-
- a(State3#vqstate{q1 = ?QUEUE:new()}).
-
-reset_qi_state(State = #vqstate{index_state = IndexState}) ->
- State#vqstate{index_state =
- rabbit_queue_index:reset_state(IndexState)}.
-
-is_pending_ack_empty(State) ->
- count_pending_acks(State) =:= 0.
-
-is_unconfirmed_empty(#vqstate { unconfirmed = UC }) ->
- gb_sets:is_empty(UC).
-
-count_pending_acks(#vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
-
-purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
- State0 = #vqstate { q3 = Q3 } =
- case Mode of
- lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State);
- _ -> State
- end,
-
- case ?QUEUE:is_empty(Q3) of
- true -> State0;
- false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0),
- purge_betas_and_deltas(DelsAndAcksFun,
- maybe_deltas_to_betas(
- DelsAndAcksFun,
- State1#vqstate{q3 = ?QUEUE:new()}))
- end.
-
-remove_queue_entries(Q, DelsAndAcksFun,
- State = #vqstate{msg_store_clients = MSCState}) ->
- {MsgIdsByStore, Delivers, Acks, State1} =
- ?QUEUE:foldl(fun remove_queue_entries1/2,
- {maps:new(), [], [], State}, Q),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- DelsAndAcksFun(Delivers, Acks, State1).
-
-remove_queue_entries1(
- #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
- msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
- is_persistent = IsPersistent} = MsgStatus,
- {MsgIdsByStore, Delivers, Acks, State}) ->
- {case MsgInStore of
- true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
- false -> MsgIdsByStore
- end,
- cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
- cons_if(IndexOnDisk, SeqId, Acks),
- stats({-1, 0}, {MsgStatus, none}, 0, State)}.
-
-process_delivers_and_acks_fun(deliver_and_ack) ->
- fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
- IndexState1 =
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
- State #vqstate { index_state = IndexState1 }
- end;
-process_delivers_and_acks_fun(_) ->
- fun (_, _, State) ->
- State
- end.
-
-%%----------------------------------------------------------------------------
-%% Internal gubbins for publishing
-%%----------------------------------------------------------------------------
-
-publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow, PersistFun,
- State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- mode = default,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
- State2 = case ?QUEUE:is_empty(Q3) of
- false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
- true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
- end,
- InCount1 = InCount + 1,
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats({1, 0}, {none, MsgStatus1}, 0,
- State2#vqstate{ next_seq_id = SeqId + 1,
- in_counter = InCount1,
- unconfirmed = UC1 });
-publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
- MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = lazy,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC,
- delta = Delta}) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- Delta1 = expand_delta(SeqId, Delta, IsPersistent),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- stats(lazy_pub, {lazy, m(MsgStatus1)}, 1,
- State1#vqstate{ delta = Delta1,
- next_seq_id = SeqId + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1}).
-
-batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
- {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
- fun maybe_prepare_write_to_disk/4, State)}.
-
-publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = default,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1}, 0,
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, State3};
-publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
- id = MsgId },
- MsgProps = #message_properties {
- needs_confirming = NeedsConfirming },
- _ChPid, _Flow, PersistFun,
- State = #vqstate { mode = lazy,
- qi_embed_msgs_below = IndexMaxSize,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
- IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
- {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
- State2 = record_pending_ack(m(MsgStatus1), State1),
- UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
- State3 = stats({0, 1}, {none, MsgStatus1}, 0,
- State2 #vqstate { next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1,
- unconfirmed = UC1 }),
- {SeqId, State3}.
-
-batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
- {SeqId, State1} =
- publish_delivered1(Msg, MsgProps, ChPid, Flow,
- fun maybe_prepare_write_to_disk/4,
- State),
- {ChPid, Flow, [SeqId | SeqIds], State1}.
-
-maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
- msg_in_store = true }, State) ->
- {MsgStatus, State};
-maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg, msg_id = MsgId,
- is_persistent = IsPersistent },
- State = #vqstate{ msg_store_clients = MSCState,
- disk_write_count = Count})
- when Force orelse IsPersistent ->
- case persist_to(MsgStatus) of
- msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
- prepare_to_store(Msg)),
- {MsgStatus#msg_status{msg_in_store = true},
- State#vqstate{disk_write_count = Count + 1}};
- queue_index -> {MsgStatus, State}
- end;
-maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-%% Due to certain optimisations made inside
-%% rabbit_queue_index:pre_publish/7 we need to have two separate
-%% functions for index persistence. This one is only used when paging
-%% during memory pressure. We didn't want to modify
-%% maybe_write_index_to_disk/3 because that function is used in other
-%% places.
-maybe_batch_write_index_to_disk(_Force,
- MsgStatus = #msg_status {
- index_on_disk = true }, State) ->
- {MsgStatus, State};
-maybe_batch_write_index_to_disk(Force,
- MsgStatus = #msg_status {
- msg = Msg,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_props = MsgProps},
- State = #vqstate {
- target_ram_count = TargetRamCount,
- disk_write_count = DiskWriteCount,
- index_state = IndexState})
- when Force orelse IsPersistent ->
- {MsgOrId, DiskWriteCount1} =
- case persist_to(MsgStatus) of
- msg_store -> {MsgId, DiskWriteCount};
- queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
- end,
- IndexState1 = rabbit_queue_index:pre_publish(
- MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
- TargetRamCount, IndexState),
- {MsgStatus#msg_status{index_on_disk = true},
- State#vqstate{index_state = IndexState1,
- disk_write_count = DiskWriteCount1}};
-maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
- index_on_disk = true }, State) ->
- {MsgStatus, State};
-maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- msg = Msg,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_props = MsgProps},
- State = #vqstate{target_ram_count = TargetRamCount,
- disk_write_count = DiskWriteCount,
- index_state = IndexState})
- when Force orelse IsPersistent ->
- {MsgOrId, DiskWriteCount1} =
- case persist_to(MsgStatus) of
- msg_store -> {MsgId, DiskWriteCount};
- queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
- end,
- IndexState1 = rabbit_queue_index:publish(
- MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
- IndexState),
- IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
- {MsgStatus#msg_status{index_on_disk = true},
- State#vqstate{index_state = IndexState2,
- disk_write_count = DiskWriteCount1}};
-
-maybe_write_index_to_disk(_Force, MsgStatus, State) ->
- {MsgStatus, State}.
-
-maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
-
-maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
- maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
-
-determine_persist_to(#basic_message{
- content = #content{properties = Props,
- properties_bin = PropsBin}},
- #message_properties{size = BodySize},
- IndexMaxSize) ->
- %% The >= is so that you can set the env to 0 and never persist
- %% to the index.
- %%
- %% We want this to be fast, so we avoid size(term_to_binary())
- %% here, or using the term size estimation from truncate.erl, both
- %% of which are too slow. So instead, if the message body size
- %% goes over the limit then we avoid any other checks.
- %%
- %% If it doesn't we need to decide if the properties will push
- %% it past the limit. If we have the encoded properties (usual
- %% case) we can just check their size. If we don't (message came
- %% via the direct client), we make a guess based on the number of
- %% headers.
- case BodySize >= IndexMaxSize of
- true -> msg_store;
- false -> Est = case is_binary(PropsBin) of
- true -> BodySize + size(PropsBin);
- false -> #'P_basic'{headers = Hs} = Props,
- case Hs of
- undefined -> 0;
- _ -> length(Hs)
- end * ?HEADER_GUESS_SIZE + BodySize
- end,
- case Est >= IndexMaxSize of
- true -> msg_store;
- false -> queue_index
- end
- end.
-
-persist_to(#msg_status{persist_to = To}) -> To.
-
-prepare_to_store(Msg) ->
- Msg#basic_message{
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)}.
-
-%%----------------------------------------------------------------------------
-%% Internal gubbins for acks
-%%----------------------------------------------------------------------------
-
-record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
- State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
- ack_in_counter = AckInCount}) ->
- Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
- {RPA1, DPA1, QPA1} =
- case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
- {false, _} -> {RPA, Insert(DPA), QPA};
- {_, queue_index} -> {RPA, DPA, Insert(QPA)};
- {_, msg_store} -> {Insert(RPA), DPA, QPA}
- end,
- State #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1,
- qi_pending_ack = QPA1,
- ack_in_counter = AckInCount + 1}.
-
-lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA}) ->
- case gb_trees:lookup(SeqId, RPA) of
- {value, V} -> V;
- none -> case gb_trees:lookup(SeqId, DPA) of
- {value, V} -> V;
- none -> gb_trees:get(SeqId, QPA)
- end
- end.
-
-%% First parameter = UpdateStats
-remove_pending_ack(true, SeqId, State) ->
- case remove_pending_ack(false, SeqId, State) of
- {none, _} ->
- {none, State};
- {MsgStatus, State1} ->
- {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)}
- end;
-remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA}) ->
- case gb_trees:lookup(SeqId, RPA) of
- {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
- {V, State #vqstate { ram_pending_ack = RPA1 }};
- none -> case gb_trees:lookup(SeqId, DPA) of
- {value, V} ->
- DPA1 = gb_trees:delete(SeqId, DPA),
- {V, State#vqstate{disk_pending_ack = DPA1}};
- none ->
- case gb_trees:lookup(SeqId, QPA) of
- {value, V} ->
- QPA1 = gb_trees:delete(SeqId, QPA),
- {V, State#vqstate{qi_pending_ack = QPA1}};
- none ->
- {none, State}
- end
- end
- end.
-
-purge_pending_ack(KeepPersistent,
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
- case KeepPersistent of
- true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
- State1;
- false -> IndexState1 =
- rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- State1 #vqstate { index_state = IndexState1 }
- end.
-
-purge_pending_ack_delete_and_terminate(
- State = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
- IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
- remove_msgs_by_id(MsgIdsByStore, MSCState),
- State1 #vqstate { index_state = IndexState1 }.
-
-purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
- {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
- rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(
- F, rabbit_misc:gb_trees_fold(
- F, accumulate_ack_init(), RPA), DPA), QPA),
- State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
- disk_pending_ack = gb_trees:empty(),
- qi_pending_ack = gb_trees:empty()},
- {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
-
-%% MsgIdsByStore is an map with two keys:
-%%
-%% true: holds a list of Persistent Message Ids.
-%% false: holds a list of Transient Message Ids.
-%%
-%% When we call maps:to_list/1 we get two sets of msg ids, where
-%% IsPersistent is either true for persistent messages or false for
-%% transient ones. The msg_store_remove/3 function takes this boolean
-%% flag to determine from which store the messages should be removed
-%% from.
-remove_msgs_by_id(MsgIdsByStore, MSCState) ->
- [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)].
-
-remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
- case maps:find(false, MsgIdsByStore) of
- error -> ok;
- {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
- end.
-
-accumulate_ack_init() -> {[], maps:new(), []}.
-
-accumulate_ack(#msg_status { seq_id = SeqId,
- msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
- case MsgInStore of
- true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
- false -> MsgIdsByStore
- end,
- [MsgId | AllMsgIds]}.
-
-%%----------------------------------------------------------------------------
-%% Internal plumbing for confirms (aka publisher acks)
-%%----------------------------------------------------------------------------
-
-record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC,
- confirmed = C }) ->
- State #vqstate {
- msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
- msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
- unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
- confirmed = gb_sets:union(C, MsgIdSet) }.
-
-msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
-msgs_written_to_disk(Callback, MsgIdSet, written) ->
- Callback(?MODULE,
- fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Confirmed = gb_sets:intersection(UC, MsgIdSet),
- record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(MOD, Confirmed) })
- end).
-
-msg_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- Confirmed = gb_sets:intersection(UC, MsgIdSet),
- record_confirms(gb_sets:intersection(MsgIdSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(MIOD, Confirmed) })
- end).
-
-msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
- Callback(?MODULE,
- fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
-
-%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue
-%%----------------------------------------------------------------------------
-
-publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
- {Msg, State1} = read_msg(MsgStatus, State),
- MsgStatus1 = MsgStatus#msg_status { msg = Msg },
- {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)};
-publish_alpha(MsgStatus, State) ->
- {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}.
-
-publish_beta(MsgStatus, State) ->
- {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}.
-
-%% Rebuild queue, inserting sequence ids to maintain ordering
-queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
- queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
- Limit, PubFun, State).
-
-queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
- Limit, PubFun, State)
- when Limit == undefined orelse SeqId < Limit ->
- case ?QUEUE:out(Q) of
- {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
- when SeqIdQ < SeqId ->
- %% enqueue from the remaining queue
- queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
- Limit, PubFun, State);
- {_, _Q1} ->
- %% enqueue from the remaining list of sequence ids
- case msg_from_pending_ack(SeqId, State) of
- {none, _} ->
- queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State);
- {MsgStatus, State1} ->
- {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- PubFun(MsgStatus, State1),
- queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
- end
- end;
-queue_merge(SeqIds, Q, Front, MsgIds,
- _Limit, _PubFun, State) ->
- {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
-
-delta_merge([], Delta, MsgIds, State) ->
- {Delta, MsgIds, State};
-delta_merge(SeqIds, Delta, MsgIds, State) ->
- lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) ->
- case msg_from_pending_ack(SeqId, State0) of
- {none, _} ->
- Acc;
- {#msg_status { msg_id = MsgId,
- is_persistent = IsPersistent } = MsgStatus, State1} ->
- {_MsgStatus, State2} =
- maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0],
- stats({1, -1}, {MsgStatus, none}, 1, State2)}
- end
- end, {Delta, MsgIds, State}, SeqIds).
-
-%% Mostly opposite of record_pending_ack/2
-msg_from_pending_ack(SeqId, State) ->
- case remove_pending_ack(false, SeqId, State) of
- {none, _} ->
- {none, State};
- {#msg_status { msg_props = MsgProps } = MsgStatus, State1} ->
- {MsgStatus #msg_status {
- msg_props = MsgProps #message_properties { needs_confirming = false } },
- State1}
- end.
-
-beta_limit(Q) ->
- case ?QUEUE:peek(Q) of
- {value, #msg_status { seq_id = SeqId }} -> SeqId;
- empty -> undefined
- end.
-
-delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
-delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-
-%%----------------------------------------------------------------------------
-%% Iterator
-%%----------------------------------------------------------------------------
-
-ram_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
-
-disk_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
-
-qi_ack_iterator(State) ->
- {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
-
-msg_iterator(State) -> istate(start, State).
-
-istate(start, State) -> {q4, State#vqstate.q4, State};
-istate(q4, State) -> {q3, State#vqstate.q3, State};
-istate(q3, State) -> {delta, State#vqstate.delta, State};
-istate(delta, State) -> {q2, State#vqstate.q2, State};
-istate(q2, State) -> {q1, State#vqstate.q1, State};
-istate(q1, _State) -> done.
-
-next({ack, It}, IndexState) ->
- case gb_trees:next(It) of
- none -> {empty, IndexState};
- {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
- {value, MsgStatus, true, Next, IndexState}
- end;
-next(done, IndexState) -> {empty, IndexState};
-next({delta, #delta{start_seq_id = SeqId,
- end_seq_id = SeqId}, State}, IndexState) ->
- next(istate(delta, State), IndexState);
-next({delta, #delta{start_seq_id = SeqId,
- end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
- SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
- SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
- next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
-next({delta, Delta, [], State}, IndexState) ->
- next({delta, Delta, State}, IndexState);
-next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
- case is_msg_in_pending_acks(SeqId, State) of
- false -> Next = {delta, Delta, Rest, State},
- {value, beta_msg_status(M), false, Next, IndexState};
- true -> next({delta, Delta, Rest, State}, IndexState)
- end;
-next({Key, Q, State}, IndexState) ->
- case ?QUEUE:out(Q) of
- {empty, _Q} -> next(istate(Key, State), IndexState);
- {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
- {value, MsgStatus, false, Next, IndexState}
- end.
-
-inext(It, {Its, IndexState}) ->
- case next(It, IndexState) of
- {empty, IndexState1} ->
- {Its, IndexState1};
- {value, MsgStatus1, Unacked, It1, IndexState1} ->
- {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
- end.
-
-ifold(_Fun, Acc, [], State0) ->
- {Acc, State0};
-ifold(Fun, Acc, Its0, State0) ->
- [{MsgStatus, Unacked, It} | Rest] =
- lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
- {#msg_status{seq_id = SeqId2}, _, _}) ->
- SeqId1 =< SeqId2
- end, Its0),
- {Msg, State1} = read_msg(MsgStatus, State0),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
- {stop, Acc1} ->
- {Acc1, State1};
- {cont, Acc1} ->
- IndexState0 = State1#vqstate.index_state,
- {Its1, IndexState1} = inext(It, {Rest, IndexState0}),
- State2 = State1#vqstate{index_state = IndexState1},
- ifold(Fun, Acc1, Its1, State2)
- end.
-
-%%----------------------------------------------------------------------------
-%% Phase changes
-%%----------------------------------------------------------------------------
-
-maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
- mode = Mode}) ->
- case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
- true -> State1 = reduce_memory_use(State),
- State1#vqstate{memory_reduction_run_count = 0};
- false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
- end.
-
-reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
- State;
-reduce_memory_use(State = #vqstate {
- mode = default,
- ram_pending_ack = RPA,
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount,
- io_batch_size = IoBatchSize,
- rates = #rates { in = AvgIngress,
- out = AvgEgress,
- ack_in = AvgAckIngress,
- ack_out = AvgAckEgress } }) ->
- {CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
- msg_store_credit_disc_bound,
- ?CREDIT_DISC_BOUND),
- {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
- case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> {false, State};
- %% Reduce memory of pending acks and alphas. The order is
- %% determined based on which is growing faster. Whichever
- %% comes second may very well get a quota of 0 if the
- %% first manages to push out the max number of messages.
- A2BChunk ->
- %% In case there are few messages to be sent to a message store
- %% and many messages to be embedded to the queue index,
- %% we should limit the number of messages to be flushed
- %% to avoid blocking the process.
- A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
- true -> CreditDiscBound * 2;
- false -> A2BChunk
- end,
- Funs = case ((AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress)) of
- true -> [fun limit_ram_acks/2,
- fun push_alphas_to_betas/2];
- false -> [fun push_alphas_to_betas/2,
- fun limit_ram_acks/2]
- end,
- {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end, {A2BChunkActual, State}, Funs),
- {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
- end,
- Permitted = permitted_beta_count(State1),
- {NeedResumeB2D, State3} =
- %% If there are more messages with their queue position held in RAM,
- %% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
- %% write their queue position to disk, a.k.a. push_betas_to_deltas
- case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
- Permitted) of
- B2DChunk when B2DChunk >= IoBatchSize ->
- %% Same as for alphas to betas. Limit a number of messages
- %% to be flushed to disk at once to avoid blocking the process.
- B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
- true -> CreditDiscBound * 2;
- false -> B2DChunk
- end,
- StateBD = push_betas_to_deltas(B2DChunkActual, State1),
- {B2DChunk > B2DChunkActual, StateBD};
- _ ->
- {false, State1}
- end,
- %% We can be blocked by the credit flow, or limited by a batch size,
- %% or finished with flushing.
- %% If blocked by the credit flow - the credit grant will resume processing,
- %% if limited by a batch - the batch continuation message should be sent.
- %% The continuation message will be prioritised over publishes,
- %% but not consumptions, so the queue can make progess.
- Blocked = credit_flow:blocked(),
- case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
- %% Credit bump will continue paging
- {true, _} -> State3;
- %% Finished with paging
- {false, false} -> State3;
- %% Planning next batch
- {false, true} ->
- %% We don't want to use self-credit-flow, because it's harder to
- %% reason about. So the process sends a (prioritised) message to
- %% itself and sets a waiting_bump value to keep the message box clean
- maybe_bump_reduce_memory_use(State3)
- end;
-%% When using lazy queues, there are no alphas, so we don't need to
-%% call push_alphas_to_betas/2.
-reduce_memory_use(State = #vqstate {
- mode = lazy,
- ram_pending_ack = RPA,
- ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount }) ->
- State1 = #vqstate { q3 = Q3 } =
- case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
- 0 -> State;
- S1 -> {_, State2} = limit_ram_acks(S1, State),
- State2
- end,
-
- State3 =
- case chunk_size(?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- 0 ->
- State1;
- S2 ->
- push_betas_to_deltas(S2, State1)
- end,
- garbage_collect(),
- State3.
-
-maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
- State;
-maybe_bump_reduce_memory_use(State) ->
- self() ! bump_reduce_memory_use,
- State#vqstate{ waiting_bump = true }.
-
-limit_ram_acks(0, State) ->
- {0, ui(State)};
-limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
- case gb_trees:is_empty(RPA) of
- true ->
- {Quota, ui(State)};
- false ->
- {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
- {MsgStatus1, State1} =
- maybe_prepare_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
- limit_ram_acks(Quota - 1,
- stats({0, 0}, {MsgStatus, MsgStatus2}, 0,
- State1 #vqstate { ram_pending_ack = RPA1,
- disk_pending_ack = DPA1 }))
- end.
-
-permitted_beta_count(#vqstate { len = 0 }) ->
- infinity;
-permitted_beta_count(#vqstate { mode = lazy,
- target_ram_count = TargetRamCount}) ->
- TargetRamCount;
-permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
- lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
-permitted_beta_count(#vqstate { q1 = Q1,
- q4 = Q4,
- target_ram_count = TargetRamCount,
- len = Len }) ->
- BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
- lists:max([rabbit_queue_index:next_segment_boundary(0),
- BetaDelta - ((BetaDelta * BetaDelta) div
- (BetaDelta + TargetRamCount))]).
-
-chunk_size(Current, Permitted)
- when Permitted =:= infinity orelse Permitted >= Current ->
- 0;
-chunk_size(Current, Permitted) ->
- Current - Permitted.
-
-fetch_from_q3(State = #vqstate { mode = default,
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4 }) ->
- case ?QUEUE:out(Q3) of
- {empty, _Q3} ->
- {empty, State};
- {{value, MsgStatus}, Q3a} ->
- State1 = State #vqstate { q3 = Q3a },
- State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before;
- %% delta is still empty. So q2 must be
- %% empty, and we know q4 is empty
- %% otherwise we wouldn't be loading from
- %% q3. As such, we can just set q4 to Q1.
- true = ?QUEUE:is_empty(Q2), %% ASSERTION
- true = ?QUEUE:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not
- %% touched delta, so the invariants
- %% between q1, q2, delta and q3 are
- %% maintained
- State1
- end,
- {loaded, {MsgStatus, State2}}
- end;
-%% lazy queues
-fetch_from_q3(State = #vqstate { mode = lazy,
- delta = #delta { count = DeltaCount },
- q3 = Q3 }) ->
- case ?QUEUE:out(Q3) of
- {empty, _Q3} when DeltaCount =:= 0 ->
- {empty, State};
- {empty, _Q3} ->
- fetch_from_q3(maybe_deltas_to_betas(State));
- {{value, MsgStatus}, Q3a} ->
- State1 = State #vqstate { q3 = Q3a },
- {loaded, {MsgStatus, State1}}
- end.
-
-maybe_deltas_to_betas(State) ->
- AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
- maybe_deltas_to_betas(AfterFun, State).
-
-maybe_deltas_to_betas(_DelsAndAcksFun,
- State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
- State;
-maybe_deltas_to_betas(DelsAndAcksFun,
- State = #vqstate {
- q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_msg_count = RamMsgCount,
- ram_bytes = RamBytes,
- disk_read_count = DiskReadCount,
- delta_transient_bytes = DeltaTransientBytes,
- transient_threshold = TransientThreshold }) ->
- #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- transient = Transient,
- end_seq_id = DeltaSeqIdEnd } = Delta,
- DeltaSeqId1 =
- lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
- {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
- IndexState),
- {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =
- betas_from_index_entries(List, TransientThreshold,
- DelsAndAcksFun,
- State #vqstate { index_state = IndexState1 }),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc,
- disk_read_count = DiskReadCount + RamCountsInc },
- case ?QUEUE:len(Q3a) of
- 0 ->
- %% we ignored every message in the segment due to it being
- %% transient and below the threshold
- maybe_deltas_to_betas(
- DelsAndAcksFun,
- State2 #vqstate {
- delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
- Q3aLen ->
- Q3b = ?QUEUE:join(Q3, Q3a),
- case DeltaCount - Q3aLen of
- 0 ->
- %% delta is now empty, but it wasn't before, so
- %% can now join q2 onto q3
- State2 #vqstate { q2 = ?QUEUE:new(),
- delta = ?BLANK_DELTA,
- q3 = ?QUEUE:join(Q3b, Q2),
- delta_transient_bytes = 0};
- N when N > 0 ->
- Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
- count = N,
- transient = Transient - TransientCount,
- end_seq_id = DeltaSeqIdEnd }),
- State2 #vqstate { delta = Delta1,
- q3 = Q3b,
- delta_transient_bytes = DeltaTransientBytes - TransientBytes }
- end
- end.
-
-push_alphas_to_betas(Quota, State) ->
- {Quota1, State1} =
- push_alphas_to_betas(
- fun ?QUEUE:out/1,
- fun (MsgStatus, Q1a,
- State0 = #vqstate { q3 = Q3, delta = #delta { count = 0,
- transient = 0 } }) ->
- State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
- (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
- State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
- end, Quota, State #vqstate.q1, State),
- {Quota2, State2} =
- push_alphas_to_betas(
- fun ?QUEUE:out_r/1,
- fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
- State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
- end, Quota1, State1 #vqstate.q4, State1),
- {Quota2, State2}.
-
-push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount })
- when Quota =:= 0 orelse
- TargetRamCount =:= infinity orelse
- TargetRamCount >= RamMsgCount ->
- {Quota, ui(State)};
-push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
- %% We consume credits from the message_store whenever we need to
- %% persist a message to disk. See:
- %% rabbit_variable_queue:msg_store_write/4. So perhaps the
- %% msg_store is trying to throttle down our queue.
- case credit_flow:blocked() of
- true -> {Quota, ui(State)};
- false -> case Generator(Q) of
- {empty, _Q} ->
- {Quota, ui(State)};
- {{value, MsgStatus}, Qa} ->
- {MsgStatus1, State1} =
- maybe_prepare_write_to_disk(true, false, MsgStatus,
- State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, 0, State1),
- State3 = Consumer(MsgStatus2, Qa, State2),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State3)
- end
- end.
-
-push_betas_to_deltas(Quota, State = #vqstate { mode = default,
- q2 = Q2,
- delta = Delta,
- q3 = Q3}) ->
- PushState = {Quota, Delta, State},
- {Q3a, PushState1} = push_betas_to_deltas(
- fun ?QUEUE:out_r/1,
- fun rabbit_queue_index:next_segment_boundary/1,
- Q3, PushState),
- {Q2a, PushState2} = push_betas_to_deltas(
- fun ?QUEUE:out/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, PushState1),
- {_, Delta1, State1} = PushState2,
- State1 #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a };
-%% In the case of lazy queues we want to page as many messages as
-%% possible from q3.
-push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
- delta = Delta,
- q3 = Q3}) ->
- PushState = {Quota, Delta, State},
- {Q3a, PushState1} = push_betas_to_deltas(
- fun ?QUEUE:out_r/1,
- fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q3, PushState),
- {_, Delta1, State1} = PushState1,
- State1 #vqstate { delta = Delta1,
- q3 = Q3a }.
-
-
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
- case ?QUEUE:is_empty(Q) of
- true ->
- {Q, PushState};
- false ->
- {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
- {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
- Limit = LimitFun(MinSeqId),
- case MaxSeqId < Limit of
- true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
- end
- end.
-
-push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
- {Q, {0, Delta, ui(State)}};
-push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
- case Generator(Q) of
- {empty, _Q} ->
- {Q, {Quota, Delta, ui(State)}};
- {{value, #msg_status { seq_id = SeqId }}, _Qa}
- when SeqId < Limit ->
- {Q, {Quota, Delta, ui(State)}};
- {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
- {#msg_status { index_on_disk = true,
- is_persistent = IsPersistent }, State1} =
- maybe_batch_write_index_to_disk(true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, 1, State1),
- Delta1 = expand_delta(SeqId, Delta, IsPersistent),
- push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, State2})
- end.
-
-%% Flushes queue index batch caches and updates queue index state.
-ui(#vqstate{index_state = IndexState,
- target_ram_count = TargetRamCount} = State) ->
- IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
- TargetRamCount, IndexState),
- State#vqstate{index_state = IndexState1}.
-
-%%----------------------------------------------------------------------------
-%% Upgrading
-%%----------------------------------------------------------------------------
-
--spec multiple_routing_keys() -> 'ok'.
-
-multiple_routing_keys() ->
- transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- MsgId, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- MsgId, Persistent}};
- (_) -> {error, corrupt_message}
- end),
- ok.
-
-
-%% Assumes message store is not running
-transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE, TransformFun).
-
-transform_store(Store, TransformFun) ->
- rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
- rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
-
-move_messages_to_vhost_store() ->
- case list_persistent_queues() of
- [] ->
- log_upgrade("No durable queues found."
- " Skipping message store migration"),
- ok;
- Queues ->
- move_messages_to_vhost_store(Queues)
- end,
- ok = delete_old_store(),
- ok = rabbit_queue_index:cleanup_global_recovery_terms().
-
-move_messages_to_vhost_store(Queues) ->
- log_upgrade("Moving messages to per-vhost message store"),
- %% Move the queue index for each persistent queue to the new store
- lists:foreach(
- fun(Queue) ->
- QueueName = amqqueue:get_name(Queue),
- rabbit_queue_index:move_to_per_vhost_stores(QueueName)
- end,
- Queues),
- %% Legacy (global) msg_store may require recovery.
- %% This upgrade step should only be started
- %% if we are upgrading from a pre-3.7.0 version.
- {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues),
-
- OldStore = run_old_persistent_store(RecoveryRefs, StartFunState),
-
- VHosts = rabbit_vhost:list_names(),
-
- %% New store should not be recovered.
- NewMsgStore = start_new_store(VHosts),
- %% Recovery terms should be started for all vhosts for new store.
- [ok = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts],
-
- MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size,
- ?QUEUE_MIGRATION_BATCH_SIZE),
- in_batches(MigrationBatchSize,
- {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]},
- QueuesWithTerms,
- "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
- "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
-
- log_upgrade("Message store migration finished"),
- ok = rabbit_sup:stop_child(OldStore),
- [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts],
- ok = stop_new_store(NewMsgStore).
-
-in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
- in_batches(Size, 1, MFA, List, MessageStart, MessageEnd).
-
-in_batches(_, _, _, [], _, _) -> ok;
-in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
- Length = length(List),
- {Batch, Tail} = case Size > Length of
- true -> {List, []};
- false -> lists:split(Size, List)
- end,
- ProcessedLength = (BatchNum - 1) * Size,
- rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]),
- {M, F, A} = MFA,
- Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
- lists:foreach(fun(Key) ->
- case rpc:yield(Key) of
- {badrpc, Err} -> throw(Err);
- _ -> ok
- end
- end,
- Keys),
- rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
- in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
-
-migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
- RecoveryTerm},
- OldStore, NewStore) ->
- log_upgrade_verbose(
- "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n",
- [Name, VHost]),
- OldStoreClient = get_global_store_client(OldStore),
- NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
- %% WARNING: During scan_queue_segments queue index state is being recovered
- %% and terminated. This can cause side effects!
- rabbit_queue_index:scan_queue_segments(
- %% We migrate only persistent messages which are found in message store
- %% and are not acked yet
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC)
- when is_binary(MsgId) ->
- migrate_message(MsgId, OldC, NewStoreClient);
- (_SeqId, _MsgId, _MsgProps,
- _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
- OldC
- end,
- OldStoreClient,
- QueueName),
- rabbit_msg_store:client_terminate(OldStoreClient),
- rabbit_msg_store:client_terminate(NewStoreClient),
- NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),
- case RecoveryTerm of
- non_clean_shutdown -> ok;
- Term when is_list(Term) ->
- NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm,
- {persistent_ref, NewClientRef}),
- rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm)
- end,
- log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]),
- {QueueName, NewClientRef}.
-
-migrate_message(MsgId, OldC, NewC) ->
- case rabbit_msg_store:read(MsgId, OldC) of
- {{ok, Msg}, OldC1} ->
- ok = rabbit_msg_store:write(MsgId, Msg, NewC),
- OldC1;
- _ -> OldC
- end.
-
-get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) ->
- {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore),
- rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(),
- fun(_,_) -> ok end, fun() -> ok end).
-
-get_global_store_client(OldStore) ->
- rabbit_msg_store:client_init(OldStore,
- rabbit_guid:gen(),
- fun(_,_) -> ok end,
- fun() -> ok end).
-
-list_persistent_queues() ->
- Node = node(),
- mnesia:async_dirty(
- fun () ->
- qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue),
- ?amqqueue_is_classic(Q),
- amqqueue:qnode(Q) == Node,
- mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []]))
- end).
-
-read_old_recovery_terms([]) ->
- {[], [], ?EMPTY_START_FUN_STATE};
-read_old_recovery_terms(Queues) ->
- QueueNames = [amqqueue:get_name(Q) || Q <- Queues],
- {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames),
- Refs = [Ref || Terms <- AllTerms,
- Terms /= non_clean_shutdown,
- begin
- Ref = proplists:get_value(persistent_ref, Terms),
- Ref =/= undefined
- end],
- {lists:zip(QueueNames, AllTerms), Refs, StartFunState}.
-
-run_old_persistent_store(Refs, StartFunState) ->
- OldStoreName = ?PERSISTENT_MSG_STORE,
- ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link,
- [OldStoreName, rabbit_mnesia:dir(),
- Refs, StartFunState]),
- OldStoreName.
-
-start_new_store(VHosts) ->
- %% Ensure vhost supervisor is started, so we can add vhosts to it.
- lists:map(fun(VHost) ->
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE,
- VHostDir,
- undefined,
- ?EMPTY_START_FUN_STATE),
- {VHost, Pid}
- end,
- VHosts).
-
-stop_new_store(NewStore) ->
- lists:foreach(fun({_VHost, StorePid}) ->
- unlink(StorePid),
- exit(StorePid, shutdown)
- end,
- NewStore),
- ok.
-
-delete_old_store() ->
- log_upgrade("Removing the old message store data"),
- rabbit_file:recursive_delete(
- [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]),
- %% Delete old transient store as well
- rabbit_file:recursive_delete(
- [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]),
- ok.
-
-log_upgrade(Msg) ->
- log_upgrade(Msg, []).
-
-log_upgrade(Msg, Args) ->
- rabbit_log:info("message_store upgrades: " ++ Msg, Args).
-
-log_upgrade_verbose(Msg) ->
- log_upgrade_verbose(Msg, []).
-
-log_upgrade_verbose(Msg, Args) ->
- rabbit_log_upgrade:info(Msg, Args).
-
-maybe_client_terminate(MSCStateP) ->
- %% Queue might have been asked to stop by the supervisor, it needs a clean
- %% shutdown in order for the supervising strategy to work - if it reaches max
- %% restarts might bring the vhost down.
- try
- rabbit_msg_store:client_terminate(MSCStateP)
- catch
- _:_ ->
- ok
- end.