diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-11-18 14:27:41 +0000 |
commit | f23a51261d9502ec39df0f8db47ba6b22aa7659f (patch) | |
tree | 53dcdf46e7dc2c14e81ee960bce8793879b488d3 /src/rabbit_variable_queue.erl | |
parent | afa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff) | |
parent | 9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff) | |
download | rabbitmq-server-git-stream-timestamp-offset.tar.gz |
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 3015 |
1 files changed, 0 insertions, 3015 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl deleted file mode 100644 index cf6fa4a189..0000000000 --- a/src/rabbit_variable_queue.erl +++ /dev/null @@ -1,3015 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. -%% - --module(rabbit_variable_queue). - --export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, - purge/1, purge_acks/1, - publish/6, publish_delivered/5, - batch_publish/4, batch_publish_delivered/4, - discard/4, drain_confirmed/1, - dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, - ackfold/4, fold/3, len/1, is_empty/1, depth/1, - set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, - handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2, set_queue_mode/2, - zip_msgs_and_acks/4, multiple_routing_keys/0, handle_info/2]). - --export([start/2, stop/1]). - -%% exported for testing only --export([start_msg_store/3, stop_msg_store/1, init/6]). - --export([move_messages_to_vhost_store/0]). - --export([migrate_queue/3, migrate_message/3, get_per_vhost_store_client/2, - get_global_store_client/1, log_upgrade_verbose/1, - log_upgrade_verbose/2]). - --include_lib("stdlib/include/qlc.hrl"). - --define(QUEUE_MIGRATION_BATCH_SIZE, 100). --define(EMPTY_START_FUN_STATE, {fun (ok) -> finished end, ok}). - -%%---------------------------------------------------------------------------- -%% Messages, and their position in the queue, can be in memory or on -%% disk, or both. Persistent messages will have both message and -%% position pushed to disk as soon as they arrive; transient messages -%% can be written to disk (and thus both types can be evicted from -%% memory) under memory pressure. The question of whether a message is -%% in RAM and whether it is persistent are orthogonal. -%% -%% Messages are persisted using the queue index and the message -%% store. Normally the queue index holds the position of the message -%% *within this queue* along with a couple of small bits of metadata, -%% while the message store holds the message itself (including headers -%% and other properties). -%% -%% However, as an optimisation, small messages can be embedded -%% directly in the queue index and bypass the message store -%% altogether. -%% -%% Definitions: -%% -%% alpha: this is a message where both the message itself, and its -%% position within the queue are held in RAM -%% -%% beta: this is a message where the message itself is only held on -%% disk (if persisted to the message store) but its position -%% within the queue is held in RAM. -%% -%% gamma: this is a message where the message itself is only held on -%% disk, but its position is both in RAM and on disk. -%% -%% delta: this is a collection of messages, represented by a single -%% term, where the messages and their position are only held on -%% disk. -%% -%% Note that for persistent messages, the message and its position -%% within the queue are always held on disk, *in addition* to being in -%% one of the above classifications. -%% -%% Also note that within this code, the term gamma seldom -%% appears. It's frequently the case that gammas are defined by betas -%% who have had their queue position recorded on disk. -%% -%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though -%% many of these steps are frequently skipped. q1 and q4 only hold -%% alphas, q2 and q3 hold both betas and gammas. When a message -%% arrives, its classification is determined. It is then added to the -%% rightmost appropriate queue. -%% -%% If a new message is determined to be a beta or gamma, q1 is -%% empty. If a new message is determined to be a delta, q1 and q2 are -%% empty (and actually q4 too). -%% -%% When removing messages from a queue, if q4 is empty then q3 is read -%% directly. If q3 becomes empty then the next segment's worth of -%% messages from delta are read into q3, reducing the size of -%% delta. If the queue is non empty, either q4 or q3 contain -%% entries. It is never permitted for delta to hold all the messages -%% in the queue. -%% -%% The duration indicated to us by the memory_monitor is used to -%% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM (i.e. as alphas). We track the -%% ingress and egress rates for both messages and pending acks and -%% rates for both are considered when calculating the number of -%% messages to hold in RAM. When we need to push alphas to betas or -%% betas to gammas, we favour writing out messages that are further -%% from the head of the queue. This minimises writes to disk, as the -%% messages closer to the tail of the queue stay in the queue for -%% longer, thus do not need to be replaced as quickly by sending other -%% messages to disk. -%% -%% Whilst messages are pushed to disk and forgotten from RAM as soon -%% as requested by a new setting of the queue RAM duration, the -%% inverse is not true: we only load messages back into RAM as -%% demanded as the queue is read from. Thus only publishes to the -%% queue will take up available spare capacity. -%% -%% When we report our duration to the memory monitor, we calculate -%% average ingress and egress rates over the last two samples, and -%% then calculate our duration based on the sum of the ingress and -%% egress rates. More than two samples could be used, but it's a -%% balance between responding quickly enough to changes in -%% producers/consumers versus ignoring temporary blips. The problem -%% with temporary blips is that with just a few queues, they can have -%% substantial impact on the calculation of the average duration and -%% hence cause unnecessary I/O. Another alternative is to increase the -%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5 -%% seconds. However, that then runs the risk of being too slow to -%% inform the memory monitor of changes. Thus a 5 second interval, -%% plus a rolling average over the last two samples seems to work -%% well in practice. -%% -%% The sum of the ingress and egress rates is used because the egress -%% rate alone is not sufficient. Adding in the ingress rate means that -%% queues which are being flooded by messages are given more memory, -%% resulting in them being able to process the messages faster (by -%% doing less I/O, or at least deferring it) and thus helping keep -%% their mailboxes empty and thus the queue as a whole is more -%% responsive. If such a queue also has fast but previously idle -%% consumers, the consumer can then start to be driven as fast as it -%% can go, whereas if only egress rate was being used, the incoming -%% messages may have to be written to disk and then read back in, -%% resulting in the hard disk being a bottleneck in driving the -%% consumers. Generally, we want to give Rabbit every chance of -%% getting rid of messages as fast as possible and remaining -%% responsive, and using only the egress rate impacts that goal. -%% -%% Once the queue has more alphas than the target_ram_count, the -%% surplus must be converted to betas, if not gammas, if not rolled -%% into delta. The conditions under which these transitions occur -%% reflect the conflicting goals of minimising RAM cost per msg, and -%% minimising CPU cost per msg. Once the msg has become a beta, its -%% payload is no longer in RAM, thus a read from the msg_store must -%% occur before the msg can be delivered, but the RAM cost of a beta -%% is the same as a gamma, so converting a beta to gamma will not free -%% up any further RAM. To reduce the RAM cost further, the gamma must -%% be rolled into delta. Whilst recovering a beta or a gamma to an -%% alpha requires only one disk read (from the msg_store), recovering -%% a msg from within delta will require two reads (queue_index and -%% then msg_store). But delta has a near-0 per-msg RAM cost. So the -%% conflict is between using delta more, which will free up more -%% memory, but require additional CPU and disk ops, versus using delta -%% less and gammas and betas more, which will cost more memory, but -%% require fewer disk ops and less CPU overhead. -%% -%% In the case of a persistent msg published to a durable queue, the -%% msg is immediately written to the msg_store and queue_index. If -%% then additionally converted from an alpha, it'll immediately go to -%% a gamma (as it's already in queue_index), and cannot exist as a -%% beta. Thus a durable queue with a mixture of persistent and -%% transient msgs in it which has more messages than permitted by the -%% target_ram_count may contain an interspersed mixture of betas and -%% gammas in q2 and q3. -%% -%% There is then a ratio that controls how many betas and gammas there -%% can be. This is based on the target_ram_count and thus expresses -%% the fact that as the number of permitted alphas in the queue falls, -%% so should the number of betas and gammas fall (i.e. delta -%% grows). If q2 and q3 contain more than the permitted number of -%% betas and gammas, then the surplus are forcibly converted to gammas -%% (as necessary) and then rolled into delta. The ratio is that -%% delta/(betas+gammas+delta) equals -%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as -%% the target_ram_count shrinks to 0, so must betas and gammas. -%% -%% The conversion of betas to deltas is done if there are at least -%% ?IO_BATCH_SIZE betas in q2 & q3. This value should not be too small, -%% otherwise the frequent operations on the queues of q2 and q3 will not be -%% effectively amortised (switching the direction of queue access defeats -%% amortisation). Note that there is a natural upper bound due to credit_flow -%% limits on the alpha to beta conversion. -%% -%% The conversion from alphas to betas is chunked due to the -%% credit_flow limits of the msg_store. This further smooths the -%% effects of changes to the target_ram_count and ensures the queue -%% remains responsive even when there is a large amount of IO work to -%% do. The 'resume' callback is utilised to ensure that conversions -%% are done as promptly as possible whilst ensuring the queue remains -%% responsive. -%% -%% In the queue we keep track of both messages that are pending -%% delivery and messages that are pending acks. In the event of a -%% queue purge, we only need to load qi segments if the queue has -%% elements in deltas (i.e. it came under significant memory -%% pressure). In the event of a queue deletion, in addition to the -%% preceding, by keeping track of pending acks in RAM, we do not need -%% to search through qi segments looking for messages that are yet to -%% be acknowledged. -%% -%% Pending acks are recorded in memory by storing the message itself. -%% If the message has been sent to disk, we do not store the message -%% content. During memory reduction, pending acks containing message -%% content have that content removed and the corresponding messages -%% are pushed out to disk. -%% -%% Messages from pending acks are returned to q4, q3 and delta during -%% requeue, based on the limits of seq_id contained in each. Requeued -%% messages retain their original seq_id, maintaining order -%% when requeued. -%% -%% The order in which alphas are pushed to betas and pending acks -%% are pushed to disk is determined dynamically. We always prefer to -%% push messages for the source (alphas or acks) that is growing the -%% fastest (with growth measured as avg. ingress - avg. egress). -%% -%% Notes on Clean Shutdown -%% (This documents behaviour in variable_queue, queue_index and -%% msg_store.) -%% -%% In order to try to achieve as fast a start-up as possible, if a -%% clean shutdown occurs, we try to save out state to disk to reduce -%% work on startup. In the msg_store this takes the form of the -%% index_module's state, plus the file_summary ets table, and client -%% refs. In the VQ, this takes the form of the count of persistent -%% messages in the queue and references into the msg_stores. The -%% queue_index adds to these terms the details of its segments and -%% stores the terms in the queue directory. -%% -%% Two message stores are used. One is created for persistent messages -%% to durable queues that must survive restarts, and the other is used -%% for all other messages that just happen to need to be written to -%% disk. On start up we can therefore nuke the transient message -%% store, and be sure that the messages in the persistent store are -%% all that we need. -%% -%% The references to the msg_stores are there so that the msg_store -%% knows to only trust its saved state if all of the queues it was -%% previously talking to come up cleanly. Likewise, the queues -%% themselves (esp queue_index) skips work in init if all the queues -%% and msg_store were shutdown cleanly. This gives both good speed -%% improvements and also robustness so that if anything possibly went -%% wrong in shutdown (or there was subsequent manual tampering), all -%% messages and queues that can be recovered are recovered, safely. -%% -%% To delete transient messages lazily, the variable_queue, on -%% startup, stores the next_seq_id reported by the queue_index as the -%% transient_threshold. From that point on, whenever it's reading a -%% message off disk via the queue_index, if the seq_id is below this -%% threshold and the message is transient then it drops the message -%% (the message itself won't exist on disk because it would have been -%% stored in the transient msg_store which would have had its saved -%% state nuked on startup). This avoids the expensive operation of -%% scanning the entire queue on startup in order to delete transient -%% messages that were only pushed to disk to save memory. -%% -%%---------------------------------------------------------------------------- - --behaviour(rabbit_backing_queue). - --record(vqstate, - { q1, - q2, - delta, - q3, - q4, - next_seq_id, - ram_pending_ack, %% msgs using store, still in RAM - disk_pending_ack, %% msgs in store, paged out - qi_pending_ack, %% msgs using qi, *can't* be paged out - index_state, - msg_store_clients, - durable, - transient_threshold, - qi_embed_msgs_below, - - len, %% w/o unacked - bytes, %% w/o unacked - unacked_bytes, - persistent_count, %% w unacked - persistent_bytes, %% w unacked - delta_transient_bytes, %% - - target_ram_count, - ram_msg_count, %% w/o unacked - ram_msg_count_prev, - ram_ack_count_prev, - ram_bytes, %% w unacked - out_counter, - in_counter, - rates, - msgs_on_disk, - msg_indices_on_disk, - unconfirmed, - confirmed, - ack_out_counter, - ack_in_counter, - %% Unlike the other counters these two do not feed into - %% #rates{} and get reset - disk_read_count, - disk_write_count, - - io_batch_size, - - %% default queue or lazy queue - mode, - %% number of reduce_memory_usage executions, once it - %% reaches a threshold the queue will manually trigger a runtime GC - %% see: maybe_execute_gc/1 - memory_reduction_run_count, - %% Queue data is grouped by VHost. We need to store it - %% to work with queue index. - virtual_host, - waiting_bump = false - }). - --record(rates, { in, out, ack_in, ack_out, timestamp }). - --record(msg_status, - { seq_id, - msg_id, - msg, - is_persistent, - is_delivered, - msg_in_store, - index_on_disk, - persist_to, - msg_props - }). - --record(delta, - { start_seq_id, %% start_seq_id is inclusive - count, - transient, - end_seq_id %% end_seq_id is exclusive - }). - --define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 --define(PERSISTENT_MSG_STORE, msg_store_persistent). --define(TRANSIENT_MSG_STORE, msg_store_transient). - --define(QUEUE, lqueue). - --include_lib("rabbit_common/include/rabbit.hrl"). --include_lib("rabbit_common/include/rabbit_framing.hrl"). --include("amqqueue.hrl"). - -%%---------------------------------------------------------------------------- - --rabbit_upgrade({multiple_routing_keys, local, []}). --rabbit_upgrade({move_messages_to_vhost_store, message_store, []}). - --type seq_id() :: non_neg_integer(). - --type rates() :: #rates { in :: float(), - out :: float(), - ack_in :: float(), - ack_out :: float(), - timestamp :: rabbit_types:timestamp()}. - --type delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer(), - end_seq_id :: non_neg_integer() }. - -%% The compiler (rightfully) complains that ack() and state() are -%% unused. For this reason we duplicate a -spec from -%% rabbit_backing_queue with the only intent being to remove -%% warnings. The problem here is that we can't parameterise the BQ -%% behaviour by these two types as we would like to. We still leave -%% these here for documentation purposes. --type ack() :: seq_id(). --type state() :: #vqstate { - q1 :: ?QUEUE:?QUEUE(), - q2 :: ?QUEUE:?QUEUE(), - delta :: delta(), - q3 :: ?QUEUE:?QUEUE(), - q4 :: ?QUEUE:?QUEUE(), - next_seq_id :: seq_id(), - ram_pending_ack :: gb_trees:tree(), - disk_pending_ack :: gb_trees:tree(), - qi_pending_ack :: gb_trees:tree(), - index_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, - {any(), binary()}}, - durable :: boolean(), - transient_threshold :: non_neg_integer(), - qi_embed_msgs_below :: non_neg_integer(), - - len :: non_neg_integer(), - bytes :: non_neg_integer(), - unacked_bytes :: non_neg_integer(), - - persistent_count :: non_neg_integer(), - persistent_bytes :: non_neg_integer(), - - target_ram_count :: non_neg_integer() | 'infinity', - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_ack_count_prev :: non_neg_integer(), - ram_bytes :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - rates :: rates(), - msgs_on_disk :: gb_sets:set(), - msg_indices_on_disk :: gb_sets:set(), - unconfirmed :: gb_sets:set(), - confirmed :: gb_sets:set(), - ack_out_counter :: non_neg_integer(), - ack_in_counter :: non_neg_integer(), - disk_read_count :: non_neg_integer(), - disk_write_count :: non_neg_integer(), - - io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy', - memory_reduction_run_count :: non_neg_integer()}. - --define(BLANK_DELTA, #delta { start_seq_id = undefined, - count = 0, - transient = 0, - end_seq_id = undefined }). --define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, - count = 0, - transient = 0, - end_seq_id = Z }). - --define(MICROS_PER_SECOND, 1000000.0). - -%% We're sampling every 5s for RAM duration; a half life that is of -%% the same order of magnitude is probably about right. --define(RATE_AVG_HALF_LIFE, 5.0). - -%% We will recalculate the #rates{} every time we get asked for our -%% RAM duration, or every N messages published, whichever is -%% sooner. We do this since the priority calculations in -%% rabbit_amqqueue_process need fairly fresh rates. --define(MSGS_PER_RATE_CALC, 100). - -%% we define the garbage collector threshold -%% it needs to tune the `reduce_memory_use` calls. Thus, the garbage collection. -%% see: rabbitmq-server-973 and rabbitmq-server-964 --define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 1000). --define(EXPLICIT_GC_RUN_OP_THRESHOLD(Mode), - case get(explicit_gc_run_operation_threshold) of - undefined -> - Val = explicit_gc_run_operation_threshold_for_mode(Mode), - put(explicit_gc_run_operation_threshold, Val), - Val; - Val -> Val - end). - -explicit_gc_run_operation_threshold_for_mode(Mode) -> - {Key, Fallback} = case Mode of - lazy -> {lazy_queue_explicit_gc_run_operation_threshold, - ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD}; - _ -> {queue_explicit_gc_run_operation_threshold, - ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD} - end, - rabbit_misc:get_env(rabbit, Key, Fallback). - -%%---------------------------------------------------------------------------- -%% Public API -%%---------------------------------------------------------------------------- - -start(VHost, DurableQueues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(VHost, DurableQueues), - %% Group recovery terms by vhost. - ClientRefs = [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - start_msg_store(VHost, ClientRefs, StartFunState), - {ok, AllTerms}. - -stop(VHost) -> - ok = stop_msg_store(VHost), - ok = rabbit_queue_index:stop(VHost). - -start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> - rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), - do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), - ok. - -do_start_msg_store(VHost, Type, Refs, StartFunState) -> - case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of - {ok, _} -> - rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]); - {error, {no_such_vhost, VHost}} = Err -> - rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n", - [Type, VHost]), - exit(Err); - {error, Error} -> - rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n", - [Type, VHost, Error]), - exit({error, Error}) - end. - -abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; -abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. - -stop_msg_store(VHost) -> - rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), - rabbit_vhost_msg_store:stop(VHost, ?PERSISTENT_MSG_STORE), - ok. - -init(Queue, Recover, Callback) -> - init( - Queue, Recover, Callback, - fun (MsgIds, ActionTaken) -> - msgs_written_to_disk(Callback, MsgIds, ActionTaken) - end, - fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end, - fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end). - -init(Q, new, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> - QueueName = amqqueue:get_name(Q), - IsDurable = amqqueue:is_durable(Q), - IndexState = rabbit_queue_index:init(QueueName, - MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - VHost = QueueName#resource.virtual_host, - init(IsDurable, IndexState, 0, 0, [], - case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE, - MsgOnDiskFun, AsyncCallback, VHost); - false -> undefined - end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, - AsyncCallback, VHost), VHost); - -%% We can be recovering a transient queue if it crashed -init(Q, Terms, AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> - QueueName = amqqueue:get_name(Q), - IsDurable = amqqueue:is_durable(Q), - {PRef, RecoveryTerms} = process_recovery_terms(Terms), - VHost = QueueName#resource.virtual_host, - {PersistentClient, ContainsCheckFun} = - case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, - MsgOnDiskFun, AsyncCallback, - VHost), - {C, fun (MsgId) when is_binary(MsgId) -> - rabbit_msg_store:contains(MsgId, C); - (#basic_message{is_persistent = Persistent}) -> - Persistent - end}; - false -> {undefined, fun(_MsgId) -> false end} - end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, - undefined, AsyncCallback, - VHost), - {DeltaCount, DeltaBytes, IndexState} = - rabbit_queue_index:recover( - QueueName, RecoveryTerms, - rabbit_vhost_msg_store:successfully_recovered_state( - VHost, - ?PERSISTENT_MSG_STORE), - ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, - PersistentClient, TransientClient, VHost). - -process_recovery_terms(Terms=non_clean_shutdown) -> - {rabbit_guid:gen(), Terms}; -process_recovery_terms(Terms) -> - case proplists:get_value(persistent_ref, Terms) of - undefined -> {rabbit_guid:gen(), []}; - PRef -> {PRef, Terms} - end. - -terminate(_Reason, State) -> - State1 = #vqstate { virtual_host = VHost, - persistent_count = PCount, - persistent_bytes = PBytes, - index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack(true, State), - PRef = case MSCStateP of - undefined -> undefined; - _ -> ok = maybe_client_terminate(MSCStateP), - rabbit_msg_store:client_ref(MSCStateP) - end, - ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT), - Terms = [{persistent_ref, PRef}, - {persistent_count, PCount}, - {persistent_bytes, PBytes}], - a(State1#vqstate { - index_state = rabbit_queue_index:terminate(VHost, Terms, IndexState), - msg_store_clients = undefined }). - -%% the only difference between purge and delete is that delete also -%% needs to delete everything that's been delivered and not ack'd. -delete_and_terminate(_Reason, State) -> - %% Normally when we purge messages we interact with the qi by - %% issues delivers and acks for every purged message. In this case - %% we don't need to do that, so we just delete the qi. - State1 = purge_and_index_reset(State), - State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack_delete_and_terminate(State1), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) - end, - rabbit_msg_store:client_delete_and_terminate(MSCStateT), - a(State2 #vqstate { msg_store_clients = undefined }). - -delete_crashed(Q) when ?is_amqqueue(Q) -> - QName = amqqueue:get_name(Q), - ok = rabbit_queue_index:erase(QName). - -purge(State = #vqstate { len = Len }) -> - case is_pending_ack_empty(State) and is_unconfirmed_empty(State) of - true -> - {Len, purge_and_index_reset(State)}; - false -> - {Len, purge_when_pending_acks(State)} - end. - -purge_acks(State) -> a(purge_pending_ack(false, State)). - -publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) -> - State1 = - publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - a(maybe_reduce_memory_use(maybe_update_rates(State1))). - -batch_publish(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, State1} = - lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes), - State2 = ui(State1), - a(maybe_reduce_memory_use(maybe_update_rates(State2))). - -publish_delivered(Msg, MsgProps, ChPid, Flow, State) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_write_to_disk/4, - State), - {SeqId, a(maybe_reduce_memory_use(maybe_update_rates(State1)))}. - -batch_publish_delivered(Publishes, ChPid, Flow, State) -> - {ChPid, Flow, SeqIds, State1} = - lists:foldl(fun batch_publish_delivered1/2, - {ChPid, Flow, [], State}, Publishes), - State2 = ui(State1), - {lists:reverse(SeqIds), a(maybe_reduce_memory_use(maybe_update_rates(State2)))}. - -discard(_MsgId, _ChPid, _Flow, State) -> State. - -drain_confirmed(State = #vqstate { confirmed = C }) -> - case gb_sets:is_empty(C) of - true -> {[], State}; %% common case - false -> {gb_sets:to_list(C), State #vqstate { - confirmed = gb_sets:new() }} - end. - -dropwhile(Pred, State) -> - {MsgProps, State1} = - remove_by_predicate(Pred, State), - {MsgProps, a(State1)}. - -fetchwhile(Pred, Fun, Acc, State) -> - {MsgProps, Acc1, State1} = - fetch_by_predicate(Pred, Fun, Acc, State), - {MsgProps, Acc1, a(State1)}. - -fetch(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - %% it is possible that the message wasn't read from disk - %% at this point, so read it in. - {Msg, State2} = read_msg(MsgStatus, State1), - {AckTag, State3} = remove(AckRequired, MsgStatus, State2), - {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)} - end. - -drop(AckRequired, State) -> - case queue_out(State) of - {empty, State1} -> - {empty, a(State1)}; - {{value, MsgStatus}, State1} -> - {AckTag, State2} = remove(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} - end. - -%% Duplicated from rabbit_backing_queue --spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. - -ack([], State) -> - {[], State}; -%% optimisation: this head is essentially a partial evaluation of the -%% general case below, for the single-ack case. -ack([SeqId], State) -> - case remove_pending_ack(true, SeqId, State) of - {none, _} -> - {[], State}; - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} -> - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })} - end; -ack(AckTags, State) -> - {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - lists:foldl( - fun (SeqId, {Acc, State2}) -> - case remove_pending_ack(true, SeqId, State2) of - {none, _} -> - {Acc, State2}; - {MsgStatus, State3} -> - {accumulate_ack(MsgStatus, Acc), State3} - end - end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - {lists:reverse(AllMsgIds), - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + length(AckTags) })}. - -requeue(AckTags, #vqstate { mode = default, - delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> - {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], - beta_limit(Q3), - fun publish_alpha/2, State), - {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, - delta_limit(Delta), - fun publish_beta/2, State1), - {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, - State2), - MsgCount = length(MsgIds2), - {MsgIds2, a(maybe_reduce_memory_use( - maybe_update_rates(ui( - State3 #vqstate { delta = Delta1, - q3 = Q3a, - q4 = Q4a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount }))))}; -requeue(AckTags, #vqstate { mode = lazy, - delta = Delta, - q3 = Q3, - in_counter = InCounter, - len = Len } = State) -> - {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], - delta_limit(Delta), - fun publish_beta/2, State), - {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, - State1), - MsgCount = length(MsgIds1), - {MsgIds1, a(maybe_reduce_memory_use( - maybe_update_rates(ui( - State2 #vqstate { delta = Delta1, - q3 = Q3a, - in_counter = InCounter + MsgCount, - len = Len + MsgCount }))))}. - -ackfold(MsgFun, Acc, State, AckTags) -> - {AccN, StateN} = - lists:foldl(fun(SeqId, {Acc0, State0}) -> - MsgStatus = lookup_pending_ack(SeqId, State0), - {Msg, State1} = read_msg(MsgStatus, State0), - {MsgFun(Msg, SeqId, Acc0), State1} - end, {Acc, State}, AckTags), - {AccN, a(StateN)}. - -fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> - {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, - [msg_iterator(State), - disk_ack_iterator(State), - ram_ack_iterator(State), - qi_ack_iterator(State)]), - ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). - -len(#vqstate { len = Len }) -> Len. - -is_empty(State) -> 0 == len(State). - -depth(State) -> - len(State) + count_pending_acks(State). - -set_ram_duration_target( - DurationTarget, State = #vqstate { - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - target_ram_count = TargetRamCount }) -> - Rate = - AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, - TargetRamCount1 = - case DurationTarget of - infinity -> infinity; - _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec - end, - State1 = State #vqstate { target_ram_count = TargetRamCount1 }, - a(case TargetRamCount1 == infinity orelse - (TargetRamCount =/= infinity andalso - TargetRamCount1 >= TargetRamCount) of - true -> State1; - false -> reduce_memory_use(State1) - end). - -maybe_update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount }) - when InCount + OutCount > ?MSGS_PER_RATE_CALC -> - update_rates(State); -maybe_update_rates(State) -> - State. - -update_rates(State = #vqstate{ in_counter = InCount, - out_counter = OutCount, - ack_in_counter = AckInCount, - ack_out_counter = AckOutCount, - rates = #rates{ in = InRate, - out = OutRate, - ack_in = AckInRate, - ack_out = AckOutRate, - timestamp = TS }}) -> - Now = erlang:monotonic_time(), - - Rates = #rates { in = update_rate(Now, TS, InCount, InRate), - out = update_rate(Now, TS, OutCount, OutRate), - ack_in = update_rate(Now, TS, AckInCount, AckInRate), - ack_out = update_rate(Now, TS, AckOutCount, AckOutRate), - timestamp = Now }, - - State#vqstate{ in_counter = 0, - out_counter = 0, - ack_in_counter = 0, - ack_out_counter = 0, - rates = Rates }. - -update_rate(Now, TS, Count, Rate) -> - Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) / - ?MICROS_PER_SECOND, - if - Time == 0 -> Rate; - true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, - Count / Time, Rate) - end. - -ram_duration(State) -> - State1 = #vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }, - ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev, - ram_pending_ack = RPA, - qi_pending_ack = QPA, - ram_ack_count_prev = RamAckCountPrev } = - update_rates(State), - - RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA), - - Duration = %% msgs+acks / (msgs+acks/sec) == sec - case lists:all(fun (X) -> X < 0.01 end, - [AvgEgressRate, AvgIngressRate, - AvgAckEgressRate, AvgAckIngressRate]) of - true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount + - RamAckCount + RamAckCountPrev) / - (4 * (AvgEgressRate + AvgIngressRate + - AvgAckEgressRate + AvgAckIngressRate)) - end, - - {Duration, State1}. - -needs_timeout(#vqstate { index_state = IndexState }) -> - case rabbit_queue_index:needs_sync(IndexState) of - confirms -> timed; - other -> idle; - false -> false - end. - -timeout(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }. - -handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> - State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. - -handle_info(bump_reduce_memory_use, State = #vqstate{ waiting_bump = true }) -> - State#vqstate{ waiting_bump = false }; -handle_info(bump_reduce_memory_use, State) -> - State. - -resume(State) -> a(reduce_memory_use(State)). - -msg_rates(#vqstate { rates = #rates { in = AvgIngressRate, - out = AvgEgressRate } }) -> - {AvgIngressRate, AvgEgressRate}. - -info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) -> - RamMsgCount; -info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - gb_trees:size(RPA) + gb_trees:size(QPA); -info(messages_ram, State) -> - info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State); -info(messages_persistent, #vqstate{persistent_count = PersistentCount}) -> - PersistentCount; -info(messages_paged_out, #vqstate{delta = #delta{transient = Count}}) -> - Count; -info(message_bytes, #vqstate{bytes = Bytes, - unacked_bytes = UBytes}) -> - Bytes + UBytes; -info(message_bytes_ready, #vqstate{bytes = Bytes}) -> - Bytes; -info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) -> - UBytes; -info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> - RamBytes; -info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> - PersistentBytes; -info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) -> - PagedOutBytes; -info(head_message_timestamp, #vqstate{ - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - qi_pending_ack = QPA}) -> - head_message_timestamp(Q3, Q4, RPA, QPA); -info(disk_reads, #vqstate{disk_read_count = Count}) -> - Count; -info(disk_writes, #vqstate{disk_write_count = Count}) -> - Count; -info(backing_queue_status, #vqstate { - q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = Mode, - len = Len, - target_ram_count = TargetRamCount, - next_seq_id = NextSeqId, - rates = #rates { in = AvgIngressRate, - out = AvgEgressRate, - ack_in = AvgAckIngressRate, - ack_out = AvgAckEgressRate }}) -> - - [ {mode , Mode}, - {q1 , ?QUEUE:len(Q1)}, - {q2 , ?QUEUE:len(Q2)}, - {delta , Delta}, - {q3 , ?QUEUE:len(Q3)}, - {q4 , ?QUEUE:len(Q4)}, - {len , Len}, - {target_ram_count , TargetRamCount}, - {next_seq_id , NextSeqId}, - {avg_ingress_rate , AvgIngressRate}, - {avg_egress_rate , AvgEgressRate}, - {avg_ack_ingress_rate, AvgAckIngressRate}, - {avg_ack_egress_rate , AvgAckEgressRate} ]; -info(_, _) -> - ''. - -invoke(?MODULE, Fun, State) -> Fun(?MODULE, State); -invoke( _, _, State) -> State. - -is_duplicate(_Msg, State) -> {false, State}. - -set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> - State; -set_queue_mode(lazy, State = #vqstate { - target_ram_count = TargetRamCount }) -> - %% To become a lazy queue we need to page everything to disk first. - State1 = convert_to_lazy(State), - %% restore the original target_ram_count - a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); -set_queue_mode(default, State) -> - %% becoming a default queue means loading messages from disk like - %% when a queue is recovered. - a(maybe_deltas_to_betas(State #vqstate { mode = default })); -set_queue_mode(_, State) -> - State. - -zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> - lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) -> - [{Id, AckTag} | Acc] - end, Accumulator, lists:zip(Msgs, AckTags)). - -convert_to_lazy(State) -> - State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = - set_ram_duration_target(0, State), - case Delta#delta.count + ?QUEUE:len(Q3) == Len of - true -> - State1; - false -> - %% When pushing messages to disk, we might have been - %% blocked by the msg_store, so we need to see if we have - %% to wait for more credit, and then keep paging messages. - %% - %% The amqqueue_process could have taken care of this, but - %% between the time it receives the bump_credit msg and - %% calls BQ:resume to keep paging messages to disk, some - %% other request may arrive to the BQ which at this moment - %% is not in a proper state for a lazy BQ (unless all - %% messages have been paged to disk already). - wait_for_msg_store_credit(), - convert_to_lazy(resume(State1)) - end. - -wait_for_msg_store_credit() -> - case credit_flow:blocked() of - true -> receive - {bump_credit, Msg} -> - credit_flow:handle_bump_msg(Msg) - end; - false -> ok - end. - -%% Get the Timestamp property of the first msg, if present. This is -%% the one with the oldest timestamp among the heads of the pending -%% acks and unread queues. We can't check disk_pending_acks as these -%% are paged out - we assume some will soon be paged in rather than -%% forcing it to happen. Pending ack msgs are included as they are -%% regarded as unprocessed until acked, this also prevents the result -%% apparently oscillating during repeated rejects. Q3 is only checked -%% when Q4 is empty as any Q4 msg will be earlier. -head_message_timestamp(Q3, Q4, RPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || - HeadMsgStatus <- - [ get_qs_head([Q4, Q3]), - get_pa_head(RPA), - get_pa_head(QPA) ], - HeadMsgStatus /= undefined, - HeadMsgStatus#msg_status.msg /= undefined ], - - Timestamps = - [Timestamp || HeadMsg <- HeadMsgs, - Timestamp <- [rabbit_basic:extract_timestamp( - HeadMsg#basic_message.content)], - Timestamp /= undefined - ], - - case Timestamps == [] of - true -> ''; - false -> lists:min(Timestamps) - end. - -get_qs_head(Qs) -> - catch lists:foldl( - fun (Q, Acc) -> - case get_q_head(Q) of - undefined -> Acc; - Val -> throw(Val) - end - end, undefined, Qs). - -get_q_head(Q) -> - get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1). - -get_pa_head(PA) -> - get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1). - -get_collection_head(Col, IsEmpty, GetVal) -> - case IsEmpty(Col) of - false -> - {_, MsgStatus} = GetVal(Col), - MsgStatus; - true -> undefined - end. - -%%---------------------------------------------------------------------------- -%% Minor helpers -%%---------------------------------------------------------------------------- -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = default, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - - %% if q1 has messages then q3 cannot be empty. See publish/6. - true = E1 or not E3, - %% if q2 has messages then we have messages in delta (paged to - %% disk). See push_alphas_to_betas/2. - true = E2 or not ED, - %% if delta has messages then q3 cannot be empty. This is enforced - %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages - %% are always kept on RAM. - true = ED or not E3, - %% if the queue length is 0, then q3 and q4 must be empty. - true = LZ == (E3 and E4), - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State; -a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - mode = lazy, - len = Len, - bytes = Bytes, - unacked_bytes = UnackedBytes, - persistent_count = PersistentCount, - persistent_bytes = PersistentBytes, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes}) -> - E1 = ?QUEUE:is_empty(Q1), - E2 = ?QUEUE:is_empty(Q2), - ED = Delta#delta.count == 0, - E3 = ?QUEUE:is_empty(Q3), - E4 = ?QUEUE:is_empty(Q4), - LZ = Len == 0, - L3 = ?QUEUE:len(Q3), - - %% q1 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E1, - - %% q2 only gets messages from q1 when push_alphas_to_betas is - %% called for a non empty delta, which won't be the case for a - %% lazy queue. This means q2 must always be empty. - true = E2, - - %% q4 must always be empty, since q1 only gets messages during - %% publish, but for lazy queues messages go straight to delta. - true = E4, - - %% if the queue is empty, then delta is empty and q3 is empty. - true = LZ == (ED and E3), - - %% There should be no messages in q1, q2, and q4 - true = Delta#delta.count + L3 == Len, - - true = Len >= 0, - true = Bytes >= 0, - true = UnackedBytes >= 0, - true = PersistentCount >= 0, - true = PersistentBytes >= 0, - true = RamMsgCount >= 0, - true = RamMsgCount =< Len, - true = RamBytes >= 0, - true = RamBytes =< Bytes + UnackedBytes, - - State. - -d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) - when Start + Count =< End -> - Delta. - -m(MsgStatus = #msg_status { is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }) -> - true = (not IsPersistent) or IndexOnDisk, - true = msg_in_ram(MsgStatus) or MsgInStore, - MsgStatus. - -one_if(true ) -> 1; -one_if(false) -> 0. - -cons_if(true, E, L) -> [E | L]; -cons_if(false, _E, L) -> L. - -gb_sets_maybe_insert(false, _Val, Set) -> Set; -gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). - -msg_status(IsPersistent, IsDelivered, SeqId, - Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) -> - #msg_status{seq_id = SeqId, - msg_id = MsgId, - msg = Msg, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = false, - index_on_disk = false, - persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), - msg_props = MsgProps}. - -beta_msg_status({Msg = #basic_message{id = MsgId}, - SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = Msg, - persist_to = queue_index, - msg_in_store = false}; - -beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) -> - MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered), - MS0#msg_status{msg_id = MsgId, - msg = undefined, - persist_to = msg_store, - msg_in_store = true}. - -beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) -> - #msg_status{seq_id = SeqId, - msg = undefined, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true, - msg_props = MsgProps}. - -trim_msg_status(MsgStatus) -> - case persist_to(MsgStatus) of - msg_store -> MsgStatus#msg_status{msg = undefined}; - queue_index -> MsgStatus - end. - -with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(MSCStateP), - {Result, {MSCStateP1, MSCStateT}}; -with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> - {Result, MSCStateT1} = Fun(MSCStateT), - {Result, {MSCStateP, MSCStateT1}}. - -with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> - {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, - fun (MSCState1) -> - {Fun(MSCState1), MSCState1} - end), - Res. - -msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> - msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun, - Callback, VHost). - -msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_vhost_msg_store:client_init(VHost, MsgStore, - Ref, MsgOnDiskFun, - fun () -> - Callback(?MODULE, CloseFDsFun) - end). - -msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) - end). - -msg_store_read(MSCState, IsPersistent, MsgId) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> - rabbit_msg_store:read(MsgId, MSCState1) - end). - -msg_store_remove(MSCState, IsPersistent, MsgIds) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MCSState1) -> - rabbit_msg_store:remove(MsgIds, MCSState1) - end). - -msg_store_close_fds(MSCState, IsPersistent) -> - with_msg_store_state( - MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). - -msg_store_close_fds_fun(IsPersistent) -> - fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> - {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), - State #vqstate { msg_store_clients = MSCState1 } - end. - -maybe_write_delivered(false, _SeqId, IndexState) -> - IndexState; -maybe_write_delivered(true, SeqId, IndexState) -> - rabbit_queue_index:deliver([SeqId], IndexState). - -betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) -> - {Filtered, Delivers, Acks, RamReadyCount, RamBytes, TransientCount, TransientBytes} = - lists:foldr( - fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, - {Filtered1, Delivers1, Acks1, RRC, RB, TC, TB} = Acc) -> - case SeqId < TransientThreshold andalso not IsPersistent of - true -> {Filtered1, - cons_if(not IsDelivered, SeqId, Delivers1), - [SeqId | Acks1], RRC, RB, TC, TB}; - false -> MsgStatus = m(beta_msg_status(M)), - HaveMsg = msg_in_ram(MsgStatus), - Size = msg_size(MsgStatus), - case is_msg_in_pending_acks(SeqId, State) of - false -> {?QUEUE:in_r(MsgStatus, Filtered1), - Delivers1, Acks1, - RRC + one_if(HaveMsg), - RB + one_if(HaveMsg) * Size, - TC + one_if(not IsPersistent), - TB + one_if(not IsPersistent) * Size}; - true -> Acc %% [0] - end - end - end, {?QUEUE:new(), [], [], 0, 0, 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State), - TransientCount, TransientBytes}. -%% [0] We don't increase RamBytes here, even though it pertains to -%% unacked messages too, since if HaveMsg then the message must have -%% been stored in the QI, thus the message must have been in -%% qi_pending_ack, thus it must already have been in RAM. - -is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - (gb_trees:is_defined(SeqId, RPA) orelse - gb_trees:is_defined(SeqId, DPA) orelse - gb_trees:is_defined(SeqId, QPA)). - -expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X), IsPersistent) -> - d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1, - transient = one_if(not IsPersistent)}); -expand_delta(SeqId, #delta { start_seq_id = StartSeqId, - count = Count, - transient = Transient } = Delta, - IsPersistent ) - when SeqId < StartSeqId -> - d(Delta #delta { start_seq_id = SeqId, count = Count + 1, - transient = Transient + one_if(not IsPersistent)}); -expand_delta(SeqId, #delta { count = Count, - end_seq_id = EndSeqId, - transient = Transient } = Delta, - IsPersistent) - when SeqId >= EndSeqId -> - d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1, - transient = Transient + one_if(not IsPersistent)}); -expand_delta(_SeqId, #delta { count = Count, - transient = Transient } = Delta, - IsPersistent ) -> - d(Delta #delta { count = Count + 1, - transient = Transient + one_if(not IsPersistent) }). - -%%---------------------------------------------------------------------------- -%% Internal major helpers for Public API -%%---------------------------------------------------------------------------- - -init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, - PersistentClient, TransientClient, VHost) -> - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), - - {DeltaCount1, DeltaBytes1} = - case Terms of - non_clean_shutdown -> {DeltaCount, DeltaBytes}; - _ -> {proplists:get_value(persistent_count, - Terms, DeltaCount), - proplists:get_value(persistent_bytes, - Terms, DeltaBytes)} - end, - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> d(#delta { start_seq_id = LowSeqId, - count = DeltaCount1, - transient = 0, - end_seq_id = NextSeqId }) - end, - Now = erlang:monotonic_time(), - IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, - ?IO_BATCH_SIZE), - - {ok, IndexMaxSize} = application:get_env( - rabbit, queue_index_embed_msgs_below), - State = #vqstate { - q1 = ?QUEUE:new(), - q2 = ?QUEUE:new(), - delta = Delta, - q3 = ?QUEUE:new(), - q4 = ?QUEUE:new(), - next_seq_id = NextSeqId, - ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - durable = IsDurable, - transient_threshold = NextSeqId, - qi_embed_msgs_below = IndexMaxSize, - - len = DeltaCount1, - persistent_count = DeltaCount1, - bytes = DeltaBytes1, - persistent_bytes = DeltaBytes1, - delta_transient_bytes = 0, - - target_ram_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_ack_count_prev = 0, - ram_bytes = 0, - unacked_bytes = 0, - out_counter = 0, - in_counter = 0, - rates = blank_rates(Now), - msgs_on_disk = gb_sets:new(), - msg_indices_on_disk = gb_sets:new(), - unconfirmed = gb_sets:new(), - confirmed = gb_sets:new(), - ack_out_counter = 0, - ack_in_counter = 0, - disk_read_count = 0, - disk_write_count = 0, - - io_batch_size = IoBatchSize, - - mode = default, - memory_reduction_run_count = 0, - virtual_host = VHost}, - a(maybe_deltas_to_betas(State)). - -blank_rates(Now) -> - #rates { in = 0.0, - out = 0.0, - ack_in = 0.0, - ack_out = 0.0, - timestamp = Now}. - -in_r(MsgStatus = #msg_status { msg = undefined }, - State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> - case ?QUEUE:is_empty(Q4) of - true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; - false -> {Msg, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status{msg = Msg}, - stats(ready0, {MsgStatus, MsgStatus1}, 0, - State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) - end; -in_r(MsgStatus, - State = #vqstate { mode = default, q4 = Q4 }) -> - State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; -%% lazy queues -in_r(MsgStatus = #msg_status { seq_id = SeqId, is_persistent = IsPersistent }, - State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> - case ?QUEUE:is_empty(Q3) of - true -> - {_MsgStatus1, State1} = - maybe_write_to_disk(true, true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, 1, State1), - Delta1 = expand_delta(SeqId, Delta, IsPersistent), - State2 #vqstate{ delta = Delta1}; - false -> - State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } - end. - -queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> - case ?QUEUE:out(Q4) of - {empty, _Q4} -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end; - {{value, MsgStatus}, Q4a} -> - {{value, MsgStatus}, State #vqstate { q4 = Q4a }} - end; -%% lazy queues -queue_out(State = #vqstate { mode = lazy }) -> - case fetch_from_q3(State) of - {empty, _State1} = Result -> Result; - {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} - end. - -read_msg(#msg_status{msg = undefined, - msg_id = MsgId, - is_persistent = IsPersistent}, State) -> - read_msg(MsgId, IsPersistent, State); -read_msg(#msg_status{msg = Msg}, State) -> - {Msg, State}. - -read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, - disk_read_count = Count}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - {Msg, State #vqstate {msg_store_clients = MSCState1, - disk_read_count = Count + 1}}. - -stats(Signs, Statuses, DeltaPaged, State) -> - stats0(expand_signs(Signs), expand_statuses(Statuses), DeltaPaged, State). - -expand_signs(ready0) -> {0, 0, true}; -expand_signs(lazy_pub) -> {1, 0, true}; -expand_signs({A, B}) -> {A, B, false}. - -expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; -expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; -expand_statuses({lazy, A}) -> {false , false, A}; -expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. - -%% In this function at least, we are religious: the variable name -%% contains "Ready" or "Unacked" iff that is what it counts. If -%% neither is present it counts both. -stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged}, - {InRamBefore, InRamAfter, MsgStatus}, DeltaPaged, - State = #vqstate{len = ReadyCount, - bytes = ReadyBytes, - ram_msg_count = RamReadyCount, - persistent_count = PersistentCount, - unacked_bytes = UnackedBytes, - ram_bytes = RamBytes, - delta_transient_bytes = DeltaBytes, - persistent_bytes = PersistentBytes}) -> - S = msg_size(MsgStatus), - DeltaTotal = DeltaReady + DeltaUnacked, - DeltaRam = case {InRamBefore, InRamAfter} of - {false, false} -> 0; - {false, true} -> 1; - {true, false} -> -1; - {true, true} -> 0 - end, - DeltaRamReady = case DeltaReady of - 1 -> one_if(InRamAfter); - -1 -> -one_if(InRamBefore); - 0 when ReadyMsgPaged -> DeltaRam; - 0 -> 0 - end, - DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent), - State#vqstate{len = ReadyCount + DeltaReady, - ram_msg_count = RamReadyCount + DeltaRamReady, - persistent_count = PersistentCount + DeltaPersistent, - bytes = ReadyBytes + DeltaReady * S, - unacked_bytes = UnackedBytes + DeltaUnacked * S, - ram_bytes = RamBytes + DeltaRam * S, - persistent_bytes = PersistentBytes + DeltaPersistent * S, - delta_transient_bytes = DeltaBytes + DeltaPaged * one_if(not MsgStatus#msg_status.is_persistent) * S}. - -msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. - -msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. - -%% first param: AckRequired -remove(true, MsgStatus = #msg_status { - seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - State1 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - - State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State1), - - {SeqId, maybe_update_rates( - State2 #vqstate {out_counter = OutCount + 1, - index_state = IndexState1})}; - -%% This function body has the same behaviour as remove_queue_entries/3 -%% but instead of removing messages based on a ?QUEUE, this removes -%% just one message, the one referenced by the MsgStatus provided. -remove(false, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State = #vqstate {out_counter = OutCount, - index_state = IndexState, - msg_store_clients = MSCState}) -> - %% Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% Remove from msg_store and queue index, if necessary - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - - IndexState2 = - case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState1); - false -> IndexState1 - end, - - State1 = stats({-1, 0}, {MsgStatus, none}, 0, State), - - {undefined, maybe_update_rates( - State1 #vqstate {out_counter = OutCount + 1, - index_state = IndexState2})}. - -%% This function exists as a way to improve dropwhile/2 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers and acks, instead of -%% sending them one by one. -%% -%% Instead of removing every message as their are popped from the -%% queue, it first accumulates them and then removes them by calling -%% remove_queue_entries/3, since the behaviour of -%% remove_queue_entries/3 when used with -%% process_delivers_and_acks_fun(deliver_and_ack) is the same as -%% calling remove(false, MsgStatus, State). -%% -%% remove/3 also updates the out_counter in every call, but here we do -%% it just once at the end. -remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - State2 = - remove_queue_entries( - QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1), - %% maybe_update_rates/1 is called in remove/2 for every - %% message. Since we update out_counter only once, we call it just - %% there. - {MsgProps, maybe_update_rates( - State2 #vqstate { - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% This function exists as a way to improve fetchwhile/4 -%% performance. The idea of having this function is to optimise calls -%% to rabbit_queue_index by batching delivers, instead of sending them -%% one by one. -%% -%% Fun is the function passed to fetchwhile/4 that's -%% applied to every fetched message and used to build the fetchwhile/4 -%% result accumulator FetchAcc. -fetch_by_predicate(Pred, Fun, FetchAcc, - State = #vqstate { - index_state = IndexState, - out_counter = OutCount}) -> - {MsgProps, QAcc, State1} = - collect_by_predicate(Pred, ?QUEUE:new(), State), - - {Delivers, FetchAcc1, State2} = - process_queue_entries(QAcc, Fun, FetchAcc, State1), - - IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState), - - {MsgProps, FetchAcc1, maybe_update_rates( - State2 #vqstate { - index_state = IndexState1, - out_counter = OutCount + ?QUEUE:len(QAcc)})}. - -%% We try to do here the same as what remove(true, State) does but -%% processing several messages at the same time. The idea is to -%% optimize rabbit_queue_index:deliver/2 calls by sending a list of -%% SeqIds instead of one by one, thus process_queue_entries1 will -%% accumulate the required deliveries, will record_pending_ack for -%% each message, and will update stats, like remove/2 does. -%% -%% For the meaning of Fun and FetchAcc arguments see -%% fetch_by_predicate/4 above. -process_queue_entries(Q, Fun, FetchAcc, State) -> - ?QUEUE:foldl(fun (MsgStatus, Acc) -> - process_queue_entries1(MsgStatus, Fun, Acc) - end, - {[], FetchAcc, State}, Q). - -process_queue_entries1( - #msg_status { seq_id = SeqId, is_delivered = IsDelivered, - index_on_disk = IndexOnDisk} = MsgStatus, - Fun, - {Delivers, FetchAcc, State}) -> - {Msg, State1} = read_msg(MsgStatus, State), - State2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State1), - {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - Fun(Msg, SeqId, FetchAcc), - stats({-1, 1}, {MsgStatus, MsgStatus}, 0, State2)}. - -collect_by_predicate(Pred, QAcc, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, QAcc, State1}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc), - State1); - false -> {MsgProps, QAcc, in_r(MsgStatus, State1)} - end - end. - -%%---------------------------------------------------------------------------- -%% Helpers for Public API purge/1 function -%%---------------------------------------------------------------------------- - -%% The difference between purge_when_pending_acks/1 -%% vs. purge_and_index_reset/1 is that the first one issues a deliver -%% and an ack to the queue index for every message that's being -%% removed, while the later just resets the queue index state. -purge_when_pending_acks(State) -> - State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), - a(State1). - -purge_and_index_reset(State) -> - State1 = purge1(process_delivers_and_acks_fun(none), State), - a(reset_qi_state(State1)). - -%% This function removes messages from each of {q1, q2, q3, q4}. -%% -%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 -%% are specially handled by purge_betas_and_deltas/2. -%% -%% purge_betas_and_deltas/2 loads messages from the queue index, -%% filling up q3 and in some cases moving messages form q2 to q3 while -%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The -%% messages loaded into q3 are removed by calling -%% remove_queue_entries/3 until there are no more messages to be read -%% from the queue index. Messages are read in batches from the queue -%% index. -purge1(AfterFun, State = #vqstate { q4 = Q4}) -> - State1 = remove_queue_entries(Q4, AfterFun, State), - - State2 = #vqstate {q1 = Q1} = - purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), - - State3 = remove_queue_entries(Q1, AfterFun, State2), - - a(State3#vqstate{q1 = ?QUEUE:new()}). - -reset_qi_state(State = #vqstate{index_state = IndexState}) -> - State#vqstate{index_state = - rabbit_queue_index:reset_state(IndexState)}. - -is_pending_ack_empty(State) -> - count_pending_acks(State) =:= 0. - -is_unconfirmed_empty(#vqstate { unconfirmed = UC }) -> - gb_sets:is_empty(UC). - -count_pending_acks(#vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). - -purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) -> - State0 = #vqstate { q3 = Q3 } = - case Mode of - lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State); - _ -> State - end, - - case ?QUEUE:is_empty(Q3) of - true -> State0; - false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0), - purge_betas_and_deltas(DelsAndAcksFun, - maybe_deltas_to_betas( - DelsAndAcksFun, - State1#vqstate{q3 = ?QUEUE:new()})) - end. - -remove_queue_entries(Q, DelsAndAcksFun, - State = #vqstate{msg_store_clients = MSCState}) -> - {MsgIdsByStore, Delivers, Acks, State1} = - ?QUEUE:foldl(fun remove_queue_entries1/2, - {maps:new(), [], [], State}, Q), - remove_msgs_by_id(MsgIdsByStore, MSCState), - DelsAndAcksFun(Delivers, Acks, State1). - -remove_queue_entries1( - #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, - is_persistent = IsPersistent} = MsgStatus, - {MsgIdsByStore, Delivers, Acks, State}) -> - {case MsgInStore of - true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), - cons_if(IndexOnDisk, SeqId, Acks), - stats({-1, 0}, {MsgStatus, none}, 0, State)}. - -process_delivers_and_acks_fun(deliver_and_ack) -> - fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> - IndexState1 = - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)), - State #vqstate { index_state = IndexState1 } - end; -process_delivers_and_acks_fun(_) -> - fun (_, _, State) -> - State - end. - -%%---------------------------------------------------------------------------- -%% Internal gubbins for publishing -%%---------------------------------------------------------------------------- - -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = case ?QUEUE:is_empty(Q3) of - false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) }; - true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) } - end, - InCount1 = InCount + 1, - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats({1, 0}, {none, MsgStatus1}, 0, - State2#vqstate{ next_seq_id = SeqId + 1, - in_counter = InCount1, - unconfirmed = UC1 }); -publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, - MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC, - delta = Delta}) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - Delta1 = expand_delta(SeqId, Delta, IsPersistent), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - stats(lazy_pub, {lazy, m(MsgStatus1)}, 1, - State1#vqstate{ delta = Delta1, - next_seq_id = SeqId + 1, - in_counter = InCount + 1, - unconfirmed = UC1}). - -batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> - {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, State)}. - -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = default, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, 0, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}; -publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, - id = MsgId }, - MsgProps = #message_properties { - needs_confirming = NeedsConfirming }, - _ChPid, _Flow, PersistFun, - State = #vqstate { mode = lazy, - qi_embed_msgs_below = IndexMaxSize, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - durable = IsDurable, - unconfirmed = UC }) -> - IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), - {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), - State2 = record_pending_ack(m(MsgStatus1), State1), - UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), - State3 = stats({0, 1}, {none, MsgStatus1}, 0, - State2 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - unconfirmed = UC1 }), - {SeqId, State3}. - -batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> - {SeqId, State1} = - publish_delivered1(Msg, MsgProps, ChPid, Flow, - fun maybe_prepare_write_to_disk/4, - State), - {ChPid, Flow, [SeqId | SeqIds], State1}. - -maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { - msg_in_store = true }, State) -> - {MsgStatus, State}; -maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, msg_id = MsgId, - is_persistent = IsPersistent }, - State = #vqstate{ msg_store_clients = MSCState, - disk_write_count = Count}) - when Force orelse IsPersistent -> - case persist_to(MsgStatus) of - msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, - prepare_to_store(Msg)), - {MsgStatus#msg_status{msg_in_store = true}, - State#vqstate{disk_write_count = Count + 1}}; - queue_index -> {MsgStatus, State} - end; -maybe_write_msg_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -%% Due to certain optimisations made inside -%% rabbit_queue_index:pre_publish/7 we need to have two separate -%% functions for index persistence. This one is only used when paging -%% during memory pressure. We didn't want to modify -%% maybe_write_index_to_disk/3 because that function is used in other -%% places. -maybe_batch_write_index_to_disk(_Force, - MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_batch_write_index_to_disk(Force, - MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate { - target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:pre_publish( - MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, - TargetRamCount, IndexState), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState1, - disk_write_count = DiskWriteCount1}}; -maybe_batch_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { - index_on_disk = true }, State) -> - {MsgStatus, State}; -maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - msg = Msg, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_props = MsgProps}, - State = #vqstate{target_ram_count = TargetRamCount, - disk_write_count = DiskWriteCount, - index_state = IndexState}) - when Force orelse IsPersistent -> - {MsgOrId, DiskWriteCount1} = - case persist_to(MsgStatus) of - msg_store -> {MsgId, DiskWriteCount}; - queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} - end, - IndexState1 = rabbit_queue_index:publish( - MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount, - IndexState), - IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1), - {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState2, - disk_write_count = DiskWriteCount1}}; - -maybe_write_index_to_disk(_Force, MsgStatus, State) -> - {MsgStatus, State}. - -maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), - maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). - -determine_persist_to(#basic_message{ - content = #content{properties = Props, - properties_bin = PropsBin}}, - #message_properties{size = BodySize}, - IndexMaxSize) -> - %% The >= is so that you can set the env to 0 and never persist - %% to the index. - %% - %% We want this to be fast, so we avoid size(term_to_binary()) - %% here, or using the term size estimation from truncate.erl, both - %% of which are too slow. So instead, if the message body size - %% goes over the limit then we avoid any other checks. - %% - %% If it doesn't we need to decide if the properties will push - %% it past the limit. If we have the encoded properties (usual - %% case) we can just check their size. If we don't (message came - %% via the direct client), we make a guess based on the number of - %% headers. - case BodySize >= IndexMaxSize of - true -> msg_store; - false -> Est = case is_binary(PropsBin) of - true -> BodySize + size(PropsBin); - false -> #'P_basic'{headers = Hs} = Props, - case Hs of - undefined -> 0; - _ -> length(Hs) - end * ?HEADER_GUESS_SIZE + BodySize - end, - case Est >= IndexMaxSize of - true -> msg_store; - false -> queue_index - end - end. - -persist_to(#msg_status{persist_to = To}) -> To. - -prepare_to_store(Msg) -> - Msg#basic_message{ - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}. - -%%---------------------------------------------------------------------------- -%% Internal gubbins for acks -%%---------------------------------------------------------------------------- - -record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus, - State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, - ack_in_counter = AckInCount}) -> - Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end, - {RPA1, DPA1, QPA1} = - case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of - {false, _} -> {RPA, Insert(DPA), QPA}; - {_, queue_index} -> {RPA, DPA, Insert(QPA)}; - {_, msg_store} -> {Insert(RPA), DPA, QPA} - end, - State #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1, - qi_pending_ack = QPA1, - ack_in_counter = AckInCount + 1}. - -lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> V; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> V; - none -> gb_trees:get(SeqId, QPA) - end - end. - -%% First parameter = UpdateStats -remove_pending_ack(true, SeqId, State) -> - case remove_pending_ack(false, SeqId, State) of - {none, _} -> - {none, State}; - {MsgStatus, State1} -> - {MsgStatus, stats({0, -1}, {MsgStatus, none}, 0, State1)} - end; -remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA}) -> - case gb_trees:lookup(SeqId, RPA) of - {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA), - {V, State #vqstate { ram_pending_ack = RPA1 }}; - none -> case gb_trees:lookup(SeqId, DPA) of - {value, V} -> - DPA1 = gb_trees:delete(SeqId, DPA), - {V, State#vqstate{disk_pending_ack = DPA1}}; - none -> - case gb_trees:lookup(SeqId, QPA) of - {value, V} -> - QPA1 = gb_trees:delete(SeqId, QPA), - {V, State#vqstate{qi_pending_ack = QPA1}}; - none -> - {none, State} - end - end - end. - -purge_pending_ack(KeepPersistent, - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), - case KeepPersistent of - true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), - State1; - false -> IndexState1 = - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 } - end. - -purge_pending_ack_delete_and_terminate( - State = #vqstate { index_state = IndexState, - msg_store_clients = MSCState }) -> - {_, MsgIdsByStore, State1} = purge_pending_ack1(State), - IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), - remove_msgs_by_id(MsgIdsByStore, MSCState), - State1 #vqstate { index_state = IndexState1 }. - -purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, - {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = - rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, rabbit_misc:gb_trees_fold( - F, accumulate_ack_init(), RPA), DPA), QPA), - State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), - disk_pending_ack = gb_trees:empty(), - qi_pending_ack = gb_trees:empty()}, - {IndexOnDiskSeqIds, MsgIdsByStore, State1}. - -%% MsgIdsByStore is an map with two keys: -%% -%% true: holds a list of Persistent Message Ids. -%% false: holds a list of Transient Message Ids. -%% -%% When we call maps:to_list/1 we get two sets of msg ids, where -%% IsPersistent is either true for persistent messages or false for -%% transient ones. The msg_store_remove/3 function takes this boolean -%% flag to determine from which store the messages should be removed -%% from. -remove_msgs_by_id(MsgIdsByStore, MSCState) -> - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)]. - -remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case maps:find(false, MsgIdsByStore) of - error -> ok; - {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) - end. - -accumulate_ack_init() -> {[], maps:new(), []}. - -accumulate_ack(#msg_status { seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), - case MsgInStore of - true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); - false -> MsgIdsByStore - end, - [MsgId | AllMsgIds]}. - -%%---------------------------------------------------------------------------- -%% Internal plumbing for confirms (aka publisher acks) -%%---------------------------------------------------------------------------- - -record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC, - confirmed = C }) -> - State #vqstate { - msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), - msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), - unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), - confirmed = gb_sets:union(C, MsgIdSet) }. - -msgs_written_to_disk(Callback, MsgIdSet, ignored) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); -msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union(MOD, Confirmed) }) - end). - -msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Confirmed = gb_sets:intersection(UC, MsgIdSet), - record_confirms(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union(MIOD, Confirmed) }) - end). - -msgs_and_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(?MODULE, - fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). - -%%---------------------------------------------------------------------------- -%% Internal plumbing for requeue -%%---------------------------------------------------------------------------- - -publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> - {Msg, State1} = read_msg(MsgStatus, State), - MsgStatus1 = MsgStatus#msg_status { msg = Msg }, - {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, 0, State1)}; -publish_alpha(MsgStatus, State) -> - {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, 0, State)}. - -publish_beta(MsgStatus, State) -> - {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, 0, State1)}. - -%% Rebuild queue, inserting sequence ids to maintain ordering -queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> - queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds, - Limit, PubFun, State). - -queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, - Limit, PubFun, State) - when Limit == undefined orelse SeqId < Limit -> - case ?QUEUE:out(Q) of - {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} - when SeqIdQ < SeqId -> - %% enqueue from the remaining queue - queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds, - Limit, PubFun, State); - {_, _Q1} -> - %% enqueue from the remaining list of sequence ids - case msg_from_pending_ack(SeqId, State) of - {none, _} -> - queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State); - {MsgStatus, State1} -> - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) - end - end; -queue_merge(SeqIds, Q, Front, MsgIds, - _Limit, _PubFun, State) -> - {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}. - -delta_merge([], Delta, MsgIds, State) -> - {Delta, MsgIds, State}; -delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) -> - case msg_from_pending_ack(SeqId, State0) of - {none, _} -> - Acc; - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent } = MsgStatus, State1} -> - {_MsgStatus, State2} = - maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0, IsPersistent), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, 1, State2)} - end - end, {Delta, MsgIds, State}, SeqIds). - -%% Mostly opposite of record_pending_ack/2 -msg_from_pending_ack(SeqId, State) -> - case remove_pending_ack(false, SeqId, State) of - {none, _} -> - {none, State}; - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} -> - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1} - end. - -beta_limit(Q) -> - case ?QUEUE:peek(Q) of - {value, #msg_status { seq_id = SeqId }} -> SeqId; - empty -> undefined - end. - -delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; -delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. - -%%---------------------------------------------------------------------------- -%% Iterator -%%---------------------------------------------------------------------------- - -ram_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. - -disk_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. - -qi_ack_iterator(State) -> - {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}. - -msg_iterator(State) -> istate(start, State). - -istate(start, State) -> {q4, State#vqstate.q4, State}; -istate(q4, State) -> {q3, State#vqstate.q3, State}; -istate(q3, State) -> {delta, State#vqstate.delta, State}; -istate(delta, State) -> {q2, State#vqstate.q2, State}; -istate(q2, State) -> {q1, State#vqstate.q1, State}; -istate(q1, _State) -> done. - -next({ack, It}, IndexState) -> - case gb_trees:next(It) of - none -> {empty, IndexState}; - {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, - {value, MsgStatus, true, Next, IndexState} - end; -next(done, IndexState) -> {empty, IndexState}; -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqId}, State}, IndexState) -> - next(istate(delta, State), IndexState); -next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> - SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId), - SeqId1 = lists:min([SeqIdB, SeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState), - next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); -next({delta, Delta, [], State}, IndexState) -> - next({delta, Delta, State}, IndexState); -next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> - case is_msg_in_pending_acks(SeqId, State) of - false -> Next = {delta, Delta, Rest, State}, - {value, beta_msg_status(M), false, Next, IndexState}; - true -> next({delta, Delta, Rest, State}, IndexState) - end; -next({Key, Q, State}, IndexState) -> - case ?QUEUE:out(Q) of - {empty, _Q} -> next(istate(Key, State), IndexState); - {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, - {value, MsgStatus, false, Next, IndexState} - end. - -inext(It, {Its, IndexState}) -> - case next(It, IndexState) of - {empty, IndexState1} -> - {Its, IndexState1}; - {value, MsgStatus1, Unacked, It1, IndexState1} -> - {[{MsgStatus1, Unacked, It1} | Its], IndexState1} - end. - -ifold(_Fun, Acc, [], State0) -> - {Acc, State0}; -ifold(Fun, Acc, Its0, State0) -> - [{MsgStatus, Unacked, It} | Rest] = - lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, - {#msg_status{seq_id = SeqId2}, _, _}) -> - SeqId1 =< SeqId2 - end, Its0), - {Msg, State1} = read_msg(MsgStatus, State0), - case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of - {stop, Acc1} -> - {Acc1, State1}; - {cont, Acc1} -> - IndexState0 = State1#vqstate.index_state, - {Its1, IndexState1} = inext(It, {Rest, IndexState0}), - State2 = State1#vqstate{index_state = IndexState1}, - ifold(Fun, Acc1, Its1, State2) - end. - -%%---------------------------------------------------------------------------- -%% Phase changes -%%---------------------------------------------------------------------------- - -maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount, - mode = Mode}) -> - case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of - true -> State1 = reduce_memory_use(State), - State1#vqstate{memory_reduction_run_count = 0}; - false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} - end. - -reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> - State; -reduce_memory_use(State = #vqstate { - mode = default, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount, - io_batch_size = IoBatchSize, - rates = #rates { in = AvgIngress, - out = AvgEgress, - ack_in = AvgAckIngress, - ack_out = AvgAckEgress } }) -> - {CreditDiscBound, _} =rabbit_misc:get_env(rabbit, - msg_store_credit_disc_bound, - ?CREDIT_DISC_BOUND), - {NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> {false, State}; - %% Reduce memory of pending acks and alphas. The order is - %% determined based on which is growing faster. Whichever - %% comes second may very well get a quota of 0 if the - %% first manages to push out the max number of messages. - A2BChunk -> - %% In case there are few messages to be sent to a message store - %% and many messages to be embedded to the queue index, - %% we should limit the number of messages to be flushed - %% to avoid blocking the process. - A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of - true -> CreditDiscBound * 2; - false -> A2BChunk - end, - Funs = case ((AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress)) of - true -> [fun limit_ram_acks/2, - fun push_alphas_to_betas/2]; - false -> [fun push_alphas_to_betas/2, - fun limit_ram_acks/2] - end, - {Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, {A2BChunkActual, State}, Funs), - {(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2} - end, - Permitted = permitted_beta_count(State1), - {NeedResumeB2D, State3} = - %% If there are more messages with their queue position held in RAM, - %% a.k.a. betas, in Q2 & Q3 than IoBatchSize, - %% write their queue position to disk, a.k.a. push_betas_to_deltas - case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3), - Permitted) of - B2DChunk when B2DChunk >= IoBatchSize -> - %% Same as for alphas to betas. Limit a number of messages - %% to be flushed to disk at once to avoid blocking the process. - B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of - true -> CreditDiscBound * 2; - false -> B2DChunk - end, - StateBD = push_betas_to_deltas(B2DChunkActual, State1), - {B2DChunk > B2DChunkActual, StateBD}; - _ -> - {false, State1} - end, - %% We can be blocked by the credit flow, or limited by a batch size, - %% or finished with flushing. - %% If blocked by the credit flow - the credit grant will resume processing, - %% if limited by a batch - the batch continuation message should be sent. - %% The continuation message will be prioritised over publishes, - %% but not consumptions, so the queue can make progess. - Blocked = credit_flow:blocked(), - case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of - %% Credit bump will continue paging - {true, _} -> State3; - %% Finished with paging - {false, false} -> State3; - %% Planning next batch - {false, true} -> - %% We don't want to use self-credit-flow, because it's harder to - %% reason about. So the process sends a (prioritised) message to - %% itself and sets a waiting_bump value to keep the message box clean - maybe_bump_reduce_memory_use(State3) - end; -%% When using lazy queues, there are no alphas, so we don't need to -%% call push_alphas_to_betas/2. -reduce_memory_use(State = #vqstate { - mode = lazy, - ram_pending_ack = RPA, - ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) -> - State1 = #vqstate { q3 = Q3 } = - case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of - 0 -> State; - S1 -> {_, State2} = limit_ram_acks(S1, State), - State2 - end, - - State3 = - case chunk_size(?QUEUE:len(Q3), - permitted_beta_count(State1)) of - 0 -> - State1; - S2 -> - push_betas_to_deltas(S2, State1) - end, - garbage_collect(), - State3. - -maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) -> - State; -maybe_bump_reduce_memory_use(State) -> - self() ! bump_reduce_memory_use, - State#vqstate{ waiting_bump = true }. - -limit_ram_acks(0, State) -> - {0, ui(State)}; -limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> - case gb_trees:is_empty(RPA) of - true -> - {Quota, ui(State)}; - false -> - {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), - limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, 0, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) - end. - -permitted_beta_count(#vqstate { len = 0 }) -> - infinity; -permitted_beta_count(#vqstate { mode = lazy, - target_ram_count = TargetRamCount}) -> - TargetRamCount; -permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> - lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); -permitted_beta_count(#vqstate { q1 = Q1, - q4 = Q4, - target_ram_count = TargetRamCount, - len = Len }) -> - BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4), - lists:max([rabbit_queue_index:next_segment_boundary(0), - BetaDelta - ((BetaDelta * BetaDelta) div - (BetaDelta + TargetRamCount))]). - -chunk_size(Current, Permitted) - when Permitted =:= infinity orelse Permitted >= Current -> - 0; -chunk_size(Current, Permitted) -> - Current - Permitted. - -fetch_from_q3(State = #vqstate { mode = default, - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} -> - {empty, State}; - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of - {true, true} -> - %% q3 is now empty, it wasn't before; - %% delta is still empty. So q2 must be - %% empty, and we know q4 is empty - %% otherwise we wouldn't be loading from - %% q3. As such, we can just set q4 to Q1. - true = ?QUEUE:is_empty(Q2), %% ASSERTION - true = ?QUEUE:is_empty(Q4), %% ASSERTION - State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 }; - {true, false} -> - maybe_deltas_to_betas(State1); - {false, _} -> - %% q3 still isn't empty, we've not - %% touched delta, so the invariants - %% between q1, q2, delta and q3 are - %% maintained - State1 - end, - {loaded, {MsgStatus, State2}} - end; -%% lazy queues -fetch_from_q3(State = #vqstate { mode = lazy, - delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> - case ?QUEUE:out(Q3) of - {empty, _Q3} when DeltaCount =:= 0 -> - {empty, State}; - {empty, _Q3} -> - fetch_from_q3(maybe_deltas_to_betas(State)); - {{value, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, - {loaded, {MsgStatus, State1}} - end. - -maybe_deltas_to_betas(State) -> - AfterFun = process_delivers_and_acks_fun(deliver_and_ack), - maybe_deltas_to_betas(AfterFun, State). - -maybe_deltas_to_betas(_DelsAndAcksFun, - State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> - State; -maybe_deltas_to_betas(DelsAndAcksFun, - State = #vqstate { - q2 = Q2, - delta = Delta, - q3 = Q3, - index_state = IndexState, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes, - disk_read_count = DiskReadCount, - delta_transient_bytes = DeltaTransientBytes, - transient_threshold = TransientThreshold }) -> - #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - transient = Transient, - end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqId1 = - lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), - {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, - IndexState), - {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = - betas_from_index_entries(List, TransientThreshold, - DelsAndAcksFun, - State #vqstate { index_state = IndexState1 }), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc }, - case ?QUEUE:len(Q3a) of - 0 -> - %% we ignored every message in the segment due to it being - %% transient and below the threshold - maybe_deltas_to_betas( - DelsAndAcksFun, - State2 #vqstate { - delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); - Q3aLen -> - Q3b = ?QUEUE:join(Q3, Q3a), - case DeltaCount - Q3aLen of - 0 -> - %% delta is now empty, but it wasn't before, so - %% can now join q2 onto q3 - State2 #vqstate { q2 = ?QUEUE:new(), - delta = ?BLANK_DELTA, - q3 = ?QUEUE:join(Q3b, Q2), - delta_transient_bytes = 0}; - N when N > 0 -> - Delta1 = d(#delta { start_seq_id = DeltaSeqId1, - count = N, - transient = Transient - TransientCount, - end_seq_id = DeltaSeqIdEnd }), - State2 #vqstate { delta = Delta1, - q3 = Q3b, - delta_transient_bytes = DeltaTransientBytes - TransientBytes } - end - end. - -push_alphas_to_betas(Quota, State) -> - {Quota1, State1} = - push_alphas_to_betas( - fun ?QUEUE:out/1, - fun (MsgStatus, Q1a, - State0 = #vqstate { q3 = Q3, delta = #delta { count = 0, - transient = 0 } }) -> - State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) }; - (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) -> - State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) } - end, Quota, State #vqstate.q1, State), - {Quota2, State2} = - push_alphas_to_betas( - fun ?QUEUE:out_r/1, - fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) -> - State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a } - end, Quota1, State1 #vqstate.q4, State1), - {Quota2, State2}. - -push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, - State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) - when Quota =:= 0 orelse - TargetRamCount =:= infinity orelse - TargetRamCount >= RamMsgCount -> - {Quota, ui(State)}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> - %% We consume credits from the message_store whenever we need to - %% persist a message to disk. See: - %% rabbit_variable_queue:msg_store_write/4. So perhaps the - %% msg_store is trying to throttle down our queue. - case credit_flow:blocked() of - true -> {Quota, ui(State)}; - false -> case Generator(Q) of - {empty, _Q} -> - {Quota, ui(State)}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, - State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = stats( - ready0, {MsgStatus, MsgStatus2}, 0, State1), - State3 = Consumer(MsgStatus2, Qa, State2), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State3) - end - end. - -push_betas_to_deltas(Quota, State = #vqstate { mode = default, - q2 = Q2, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun rabbit_queue_index:next_segment_boundary/1, - Q3, PushState), - {Q2a, PushState2} = push_betas_to_deltas( - fun ?QUEUE:out/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q2, PushState1), - {_, Delta1, State1} = PushState2, - State1 #vqstate { q2 = Q2a, - delta = Delta1, - q3 = Q3a }; -%% In the case of lazy queues we want to page as many messages as -%% possible from q3. -push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, - delta = Delta, - q3 = Q3}) -> - PushState = {Quota, Delta, State}, - {Q3a, PushState1} = push_betas_to_deltas( - fun ?QUEUE:out_r/1, - fun (Q2MinSeqId) -> Q2MinSeqId end, - Q3, PushState), - {_, Delta1, State1} = PushState1, - State1 #vqstate { delta = Delta1, - q3 = Q3a }. - - -push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> - case ?QUEUE:is_empty(Q) of - true -> - {Q, PushState}; - false -> - {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q), - {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q), - Limit = LimitFun(MinSeqId), - case MaxSeqId < Limit of - true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) - end - end. - -push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) -> - {Q, {0, Delta, ui(State)}}; -push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) -> - case Generator(Q) of - {empty, _Q} -> - {Q, {Quota, Delta, ui(State)}}; - {{value, #msg_status { seq_id = SeqId }}, _Qa} - when SeqId < Limit -> - {Q, {Quota, Delta, ui(State)}}; - {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> - {#msg_status { index_on_disk = true, - is_persistent = IsPersistent }, State1} = - maybe_batch_write_index_to_disk(true, MsgStatus, State), - State2 = stats(ready0, {MsgStatus, none}, 1, State1), - Delta1 = expand_delta(SeqId, Delta, IsPersistent), - push_betas_to_deltas1(Generator, Limit, Qa, - {Quota - 1, Delta1, State2}) - end. - -%% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_state = IndexState, - target_ram_count = TargetRamCount} = State) -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - State#vqstate{index_state = IndexState1}. - -%%---------------------------------------------------------------------------- -%% Upgrading -%%---------------------------------------------------------------------------- - --spec multiple_routing_keys() -> 'ok'. - -multiple_routing_keys() -> - transform_storage( - fun ({basic_message, ExchangeName, Routing_Key, Content, - MsgId, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - MsgId, Persistent}}; - (_) -> {error, corrupt_message} - end), - ok. - - -%% Assumes message store is not running -transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). - -transform_store(Store, TransformFun) -> - rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), - rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). - -move_messages_to_vhost_store() -> - case list_persistent_queues() of - [] -> - log_upgrade("No durable queues found." - " Skipping message store migration"), - ok; - Queues -> - move_messages_to_vhost_store(Queues) - end, - ok = delete_old_store(), - ok = rabbit_queue_index:cleanup_global_recovery_terms(). - -move_messages_to_vhost_store(Queues) -> - log_upgrade("Moving messages to per-vhost message store"), - %% Move the queue index for each persistent queue to the new store - lists:foreach( - fun(Queue) -> - QueueName = amqqueue:get_name(Queue), - rabbit_queue_index:move_to_per_vhost_stores(QueueName) - end, - Queues), - %% Legacy (global) msg_store may require recovery. - %% This upgrade step should only be started - %% if we are upgrading from a pre-3.7.0 version. - {QueuesWithTerms, RecoveryRefs, StartFunState} = read_old_recovery_terms(Queues), - - OldStore = run_old_persistent_store(RecoveryRefs, StartFunState), - - VHosts = rabbit_vhost:list_names(), - - %% New store should not be recovered. - NewMsgStore = start_new_store(VHosts), - %% Recovery terms should be started for all vhosts for new store. - [ok = rabbit_recovery_terms:open_table(VHost) || VHost <- VHosts], - - MigrationBatchSize = application:get_env(rabbit, queue_migration_batch_size, - ?QUEUE_MIGRATION_BATCH_SIZE), - in_batches(MigrationBatchSize, - {rabbit_variable_queue, migrate_queue, [OldStore, NewMsgStore]}, - QueuesWithTerms, - "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n", - "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), - - log_upgrade("Message store migration finished"), - ok = rabbit_sup:stop_child(OldStore), - [ok= rabbit_recovery_terms:close_table(VHost) || VHost <- VHosts], - ok = stop_new_store(NewMsgStore). - -in_batches(Size, MFA, List, MessageStart, MessageEnd) -> - in_batches(Size, 1, MFA, List, MessageStart, MessageEnd). - -in_batches(_, _, _, [], _, _) -> ok; -in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> - Length = length(List), - {Batch, Tail} = case Size > Length of - true -> {List, []}; - false -> lists:split(Size, List) - end, - ProcessedLength = (BatchNum - 1) * Size, - rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]), - {M, F, A} = MFA, - Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], - lists:foreach(fun(Key) -> - case rpc:yield(Key) of - {badrpc, Err} -> throw(Err); - _ -> ok - end - end, - Keys), - rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), - in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). - -migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, - RecoveryTerm}, - OldStore, NewStore) -> - log_upgrade_verbose( - "Migrating messages in queue ~s in vhost ~s to per-vhost message store~n", - [Name, VHost]), - OldStoreClient = get_global_store_client(OldStore), - NewStoreClient = get_per_vhost_store_client(QueueName, NewStore), - %% WARNING: During scan_queue_segments queue index state is being recovered - %% and terminated. This can cause side effects! - rabbit_queue_index:scan_queue_segments( - %% We migrate only persistent messages which are found in message store - %% and are not acked yet - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC) - when is_binary(MsgId) -> - migrate_message(MsgId, OldC, NewStoreClient); - (_SeqId, _MsgId, _MsgProps, - _IsPersistent, _IsDelivered, _IsAcked, OldC) -> - OldC - end, - OldStoreClient, - QueueName), - rabbit_msg_store:client_terminate(OldStoreClient), - rabbit_msg_store:client_terminate(NewStoreClient), - NewClientRef = rabbit_msg_store:client_ref(NewStoreClient), - case RecoveryTerm of - non_clean_shutdown -> ok; - Term when is_list(Term) -> - NewRecoveryTerm = lists:keyreplace(persistent_ref, 1, RecoveryTerm, - {persistent_ref, NewClientRef}), - rabbit_queue_index:update_recovery_term(QueueName, NewRecoveryTerm) - end, - log_upgrade_verbose("Finished migrating queue ~s in vhost ~s", [Name, VHost]), - {QueueName, NewClientRef}. - -migrate_message(MsgId, OldC, NewC) -> - case rabbit_msg_store:read(MsgId, OldC) of - {{ok, Msg}, OldC1} -> - ok = rabbit_msg_store:write(MsgId, Msg, NewC), - OldC1; - _ -> OldC - end. - -get_per_vhost_store_client(#resource{virtual_host = VHost}, NewStore) -> - {VHost, StorePid} = lists:keyfind(VHost, 1, NewStore), - rabbit_msg_store:client_init(StorePid, rabbit_guid:gen(), - fun(_,_) -> ok end, fun() -> ok end). - -get_global_store_client(OldStore) -> - rabbit_msg_store:client_init(OldStore, - rabbit_guid:gen(), - fun(_,_) -> ok end, - fun() -> ok end). - -list_persistent_queues() -> - Node = node(), - mnesia:async_dirty( - fun () -> - qlc:e(qlc:q([Q || Q <- mnesia:table(rabbit_durable_queue), - ?amqqueue_is_classic(Q), - amqqueue:qnode(Q) == Node, - mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []])) - end). - -read_old_recovery_terms([]) -> - {[], [], ?EMPTY_START_FUN_STATE}; -read_old_recovery_terms(Queues) -> - QueueNames = [amqqueue:get_name(Q) || Q <- Queues], - {AllTerms, StartFunState} = rabbit_queue_index:read_global_recovery_terms(QueueNames), - Refs = [Ref || Terms <- AllTerms, - Terms /= non_clean_shutdown, - begin - Ref = proplists:get_value(persistent_ref, Terms), - Ref =/= undefined - end], - {lists:zip(QueueNames, AllTerms), Refs, StartFunState}. - -run_old_persistent_store(Refs, StartFunState) -> - OldStoreName = ?PERSISTENT_MSG_STORE, - ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, start_global_store_link, - [OldStoreName, rabbit_mnesia:dir(), - Refs, StartFunState]), - OldStoreName. - -start_new_store(VHosts) -> - %% Ensure vhost supervisor is started, so we can add vhosts to it. - lists:map(fun(VHost) -> - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - {ok, Pid} = rabbit_msg_store:start_link(?PERSISTENT_MSG_STORE, - VHostDir, - undefined, - ?EMPTY_START_FUN_STATE), - {VHost, Pid} - end, - VHosts). - -stop_new_store(NewStore) -> - lists:foreach(fun({_VHost, StorePid}) -> - unlink(StorePid), - exit(StorePid, shutdown) - end, - NewStore), - ok. - -delete_old_store() -> - log_upgrade("Removing the old message store data"), - rabbit_file:recursive_delete( - [filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]), - %% Delete old transient store as well - rabbit_file:recursive_delete( - [filename:join([rabbit_mnesia:dir(), ?TRANSIENT_MSG_STORE])]), - ok. - -log_upgrade(Msg) -> - log_upgrade(Msg, []). - -log_upgrade(Msg, Args) -> - rabbit_log:info("message_store upgrades: " ++ Msg, Args). - -log_upgrade_verbose(Msg) -> - log_upgrade_verbose(Msg, []). - -log_upgrade_verbose(Msg, Args) -> - rabbit_log_upgrade:info(Msg, Args). - -maybe_client_terminate(MSCStateP) -> - %% Queue might have been asked to stop by the supervisor, it needs a clean - %% shutdown in order for the supervising strategy to work - if it reaches max - %% restarts might bring the vhost down. - try - rabbit_msg_store:client_terminate(MSCStateP) - catch - _:_ -> - ok - end. |