summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_queue_index.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_queue_index.erl')
-rw-r--r--deps/rabbit/src/rabbit_queue_index.erl1521
1 files changed, 1521 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl
new file mode 100644
index 0000000000..faab4380b5
--- /dev/null
+++ b/deps/rabbit/src/rabbit_queue_index.erl
@@ -0,0 +1,1521 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_queue_index).
+
+-export([erase/1, init/3, reset_state/1, recover/6,
+ terminate/3, delete_and_terminate/1,
+ pre_publish/7, flush_pre_publish_cache/2,
+ publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
+ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
+
+-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
+-export([scan_queue_segments/3, scan_queue_segments/4]).
+
+%% Migrates from global to per-vhost message stores
+-export([move_to_per_vhost_stores/1,
+ update_recovery_term/2,
+ read_global_recovery_terms/1,
+ cleanup_global_recovery_terms/0]).
+
+-define(CLEAN_FILENAME, "clean.dot").
+
+%%----------------------------------------------------------------------------
+
+%% The queue index is responsible for recording the order of messages
+%% within a queue on disk. As such it contains records of messages
+%% being published, delivered and acknowledged. The publish record
+%% includes the sequence ID, message ID and a small quantity of
+%% metadata about the message; the delivery and acknowledgement
+%% records just contain the sequence ID. A publish record may also
+%% contain the complete message if provided to publish/5; this allows
+%% the message store to be avoided altogether for small messages. In
+%% either case the publish record is stored in memory in the same
+%% serialised format it will take on disk.
+%%
+%% Because of the fact that the queue can decide at any point to send
+%% a queue entry to disk, you can not rely on publishes appearing in
+%% order. The only thing you can rely on is a message being published,
+%% then delivered, then ack'd.
+%%
+%% In order to be able to clean up ack'd messages, we write to segment
+%% files. These files have a fixed number of entries: ?SEGMENT_ENTRY_COUNT
+%% publishes, delivers and acknowledgements. They are numbered, and so
+%% it is known that the 0th segment contains messages 0 ->
+%% ?SEGMENT_ENTRY_COUNT - 1, the 1st segment contains messages
+%% ?SEGMENT_ENTRY_COUNT -> 2*?SEGMENT_ENTRY_COUNT - 1 and so on. As
+%% such, in the segment files, we only refer to message sequence ids
+%% by the LSBs as SeqId rem ?SEGMENT_ENTRY_COUNT. This gives them a
+%% fixed size.
+%%
+%% However, transient messages which are not sent to disk at any point
+%% will cause gaps to appear in segment files. Therefore, we delete a
+%% segment file whenever the number of publishes == number of acks
+%% (note that although it is not fully enforced, it is assumed that a
+%% message will never be ackd before it is delivered, thus this test
+%% also implies == number of delivers). In practise, this does not
+%% cause disk churn in the pathological case because of the journal
+%% and caching (see below).
+%%
+%% Because of the fact that publishes, delivers and acks can occur all
+%% over, we wish to avoid lots of seeking. Therefore we have a fixed
+%% sized journal to which all actions are appended. When the number of
+%% entries in this journal reaches max_journal_entries, the journal
+%% entries are scattered out to their relevant files, and the journal
+%% is truncated to zero size. Note that entries in the journal must
+%% carry the full sequence id, thus the format of entries in the
+%% journal is different to that in the segments.
+%%
+%% The journal is also kept fully in memory, pre-segmented: the state
+%% contains a mapping from segment numbers to state-per-segment (this
+%% state is held for all segments which have been "seen": thus a
+%% segment which has been read but has no pending entries in the
+%% journal is still held in this mapping. Also note that a map is
+%% used for this mapping, not an array because with an array, you will
+%% always have entries from 0). Actions are stored directly in this
+%% state. Thus at the point of flushing the journal, firstly no
+%% reading from disk is necessary, but secondly if the known number of
+%% acks and publishes in a segment are equal, given the known state of
+%% the segment file combined with the journal, no writing needs to be
+%% done to the segment file either (in fact it is deleted if it exists
+%% at all). This is safe given that the set of acks is a subset of the
+%% set of publishes. When it is necessary to sync messages, it is
+%% sufficient to fsync on the journal: when entries are distributed
+%% from the journal to segment files, those segments appended to are
+%% fsync'd prior to the journal being truncated.
+%%
+%% This module is also responsible for scanning the queue index files
+%% and seeding the message store on start up.
+%%
+%% Note that in general, the representation of a message's state as
+%% the tuple: {('no_pub'|{IsPersistent, Bin, MsgBin}),
+%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly
+%% necessary for most operations. However, for startup, and to ensure
+%% the safe and correct combination of journal entries with entries
+%% read from the segment on disk, this richer representation vastly
+%% simplifies and clarifies the code.
+%%
+%% For notes on Clean Shutdown and startup, see documentation in
+%% rabbit_variable_queue.
+%%
+%%----------------------------------------------------------------------------
+
+%% ---- Journal details ----
+
+-define(JOURNAL_FILENAME, "journal.jif").
+-define(QUEUE_NAME_STUB_FILE, ".queue_name").
+
+-define(PUB_PERSIST_JPREFIX, 2#00).
+-define(PUB_TRANS_JPREFIX, 2#01).
+-define(DEL_JPREFIX, 2#10).
+-define(ACK_JPREFIX, 2#11).
+-define(JPREFIX_BITS, 2).
+-define(SEQ_BYTES, 8).
+-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)).
+
+%% ---- Segment details ----
+
+-define(SEGMENT_EXTENSION, ".idx").
+
+%% TODO: The segment size would be configurable, but deriving all the
+%% other values is quite hairy and quite possibly noticeably less
+%% efficient, depending on how clever the compiler is when it comes to
+%% binary generation/matching with constant vs variable lengths.
+
+-define(REL_SEQ_BITS, 14).
+%% calculated as trunc(math:pow(2,?REL_SEQ_BITS))).
+-define(SEGMENT_ENTRY_COUNT, 16384).
+
+%% seq only is binary 01 followed by 14 bits of rel seq id
+%% (range: 0 - 16383)
+-define(REL_SEQ_ONLY_PREFIX, 01).
+-define(REL_SEQ_ONLY_PREFIX_BITS, 2).
+-define(REL_SEQ_ONLY_RECORD_BYTES, 2).
+
+%% publish record is binary 1 followed by a bit for is_persistent,
+%% then 14 bits of rel seq id, 64 bits for message expiry, 32 bits of
+%% size and then 128 bits of md5sum msg id.
+-define(PUB_PREFIX, 1).
+-define(PUB_PREFIX_BITS, 1).
+
+-define(EXPIRY_BYTES, 8).
+-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)).
+-define(NO_EXPIRY, 0).
+
+-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes
+-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)).
+
+%% This is the size of the message body content, for stats
+-define(SIZE_BYTES, 4).
+-define(SIZE_BITS, (?SIZE_BYTES * 8)).
+
+%% This is the size of the message record embedded in the queue
+%% index. If 0, the message can be found in the message store.
+-define(EMBEDDED_SIZE_BYTES, 4).
+-define(EMBEDDED_SIZE_BITS, (?EMBEDDED_SIZE_BYTES * 8)).
+
+%% 16 bytes for md5sum + 8 for expiry
+-define(PUB_RECORD_BODY_BYTES, (?MSG_ID_BYTES + ?EXPIRY_BYTES + ?SIZE_BYTES)).
+%% + 4 for size
+-define(PUB_RECORD_SIZE_BYTES, (?PUB_RECORD_BODY_BYTES + ?EMBEDDED_SIZE_BYTES)).
+
+%% + 2 for seq, bits and prefix
+-define(PUB_RECORD_PREFIX_BYTES, 2).
+
+%% ---- misc ----
+
+-define(PUB, {_, _, _}). %% {IsPersistent, Bin, MsgBin}
+
+-define(READ_MODE, [binary, raw, read]).
+-define(WRITE_MODE, [write | ?READ_MODE]).
+
+%%----------------------------------------------------------------------------
+
+-record(qistate, {
+ %% queue directory where segment and journal files are stored
+ dir,
+ %% map of #segment records
+ segments,
+ %% journal file handle obtained from/used by file_handle_cache
+ journal_handle,
+ %% how many not yet flushed entries are there
+ dirty_count,
+ %% this many not yet flushed journal entries will force a flush
+ max_journal_entries,
+ %% callback function invoked when a message is "handled"
+ %% by the index and potentially can be confirmed to the publisher
+ on_sync,
+ on_sync_msg,
+ %% set of IDs of unconfirmed [to publishers] messages
+ unconfirmed,
+ unconfirmed_msg,
+ %% optimisation
+ pre_publish_cache,
+ %% optimisation
+ delivered_cache,
+ %% queue name resource record
+ queue_name}).
+
+-record(segment, {
+ %% segment ID (an integer)
+ num,
+ %% segment file path (see also ?SEGMENT_EXTENSION)
+ path,
+ %% index operation log entries in this segment
+ journal_entries,
+ entries_to_segment,
+ %% counter of unacknowledged messages
+ unacked
+}).
+
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-rabbit_upgrade({add_queue_ttl, local, []}).
+-rabbit_upgrade({avoid_zeroes, local, [add_queue_ttl]}).
+-rabbit_upgrade({store_msg_size, local, [avoid_zeroes]}).
+-rabbit_upgrade({store_msg, local, [store_msg_size]}).
+
+-type hdl() :: ('undefined' | any()).
+-type segment() :: ('undefined' |
+ #segment { num :: non_neg_integer(),
+ path :: file:filename(),
+ journal_entries :: array:array(),
+ entries_to_segment :: array:array(),
+ unacked :: non_neg_integer()
+ }).
+-type seq_id() :: integer().
+-type seg_map() :: {map(), [segment()]}.
+-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
+-type qistate() :: #qistate { dir :: file:filename(),
+ segments :: 'undefined' | seg_map(),
+ journal_handle :: hdl(),
+ dirty_count :: integer(),
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: on_sync_fun(),
+ on_sync_msg :: on_sync_fun(),
+ unconfirmed :: gb_sets:set(),
+ unconfirmed_msg :: gb_sets:set(),
+ pre_publish_cache :: list(),
+ delivered_cache :: list()
+ }.
+-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
+-type walker(A) :: fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A}).
+-type shutdown_terms() :: [term()] | 'non_clean_shutdown'.
+
+%%----------------------------------------------------------------------------
+%% public API
+%%----------------------------------------------------------------------------
+
+-spec erase(rabbit_amqqueue:name()) -> 'ok'.
+
+erase(#resource{ virtual_host = VHost } = Name) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ #qistate { dir = Dir } = blank_state(VHostDir, Name),
+ erase_index_dir(Dir).
+
+%% used during variable queue purge when there are no pending acks
+
+-spec reset_state(qistate()) -> qistate().
+
+reset_state(#qistate{ queue_name = Name,
+ dir = Dir,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
+ journal_handle = JournalHdl }) ->
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ ok = erase_index_dir(Dir),
+ blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
+
+-spec init(rabbit_amqqueue:name(),
+ on_sync_fun(), on_sync_fun()) -> qistate().
+
+init(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = #qistate { dir = Dir } = blank_state(VHostDir, Name),
+ false = rabbit_file:is_file(Dir), %% is_file == is file or dir
+ State#qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}.
+
+-spec recover(rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ contains_predicate(),
+ on_sync_fun(), on_sync_fun()) ->
+ {'undefined' | non_neg_integer(),
+ 'undefined' | non_neg_integer(), qistate()}.
+
+recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
+ ContainsCheckFun, OnSyncFun, OnSyncMsgFun) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ State = blank_state(VHostDir, Name),
+ State1 = State #qistate{on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun},
+ CleanShutdown = Terms /= non_clean_shutdown,
+ case CleanShutdown andalso MsgStoreRecovered of
+ true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
+ end.
+
+-spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate().
+
+terminate(VHost, Terms, State = #qistate { dir = Dir }) ->
+ {SegmentCounts, State1} = terminate(State),
+ rabbit_recovery_terms:store(VHost, filename:basename(Dir),
+ [{segments, SegmentCounts} | Terms]),
+ State1.
+
+-spec delete_and_terminate(qistate()) -> qistate().
+
+delete_and_terminate(State) ->
+ {_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
+ ok = rabbit_file:recursive_delete([Dir]),
+ State1.
+
+pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
+ State = #qistate{pre_publish_cache = PPC,
+ delivered_cache = DC}) ->
+ State1 = maybe_needs_confirming(MsgProps, MsgOrId, State),
+
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
+
+ PPC1 =
+ [[<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | PPC],
+
+ DC1 =
+ case IsDelivered of
+ true ->
+ [SeqId | DC];
+ false ->
+ DC
+ end,
+
+ State2 = add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1),
+ maybe_flush_pre_publish_cache(
+ JournalSizeHint,
+ State2#qistate{pre_publish_cache = PPC1,
+ delivered_cache = DC1}).
+
+%% pre_publish_cache is the entry with most elements when compared to
+%% delivered_cache so we only check the former in the guard.
+maybe_flush_pre_publish_cache(JournalSizeHint,
+ #qistate{pre_publish_cache = PPC} = State)
+ when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
+ flush_pre_publish_cache(JournalSizeHint, State);
+maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
+ State.
+
+flush_pre_publish_cache(JournalSizeHint, State) ->
+ State1 = flush_pre_publish_cache(State),
+ State2 = flush_delivered_cache(State1),
+ maybe_flush_journal(JournalSizeHint, State2).
+
+flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
+ State;
+flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
+ State1#qistate{pre_publish_cache = []}.
+
+flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
+ State;
+flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
+ State1 = deliver(lists:reverse(DC), State),
+ State1#qistate{delivered_cache = []}.
+
+-spec publish(rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(), boolean(),
+ non_neg_integer(), qistate()) -> qistate().
+
+publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint, State) ->
+ {JournalHdl, State1} =
+ get_journal_handle(
+ maybe_needs_confirming(MsgProps, MsgOrId, State)),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
+ ok = file_handle_cache:append(
+ JournalHdl, [<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin]),
+ maybe_flush_journal(
+ JournalSizeHint,
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State1)).
+
+maybe_needs_confirming(MsgProps, MsgOrId,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
+ ?MSG_ID_BYTES = size(MsgId),
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
+ end.
+
+-spec deliver([seq_id()], qistate()) -> qistate().
+
+deliver(SeqIds, State) ->
+ deliver_or_ack(del, SeqIds, State).
+
+-spec ack([seq_id()], qistate()) -> qistate().
+
+ack(SeqIds, State) ->
+ deliver_or_ack(ack, SeqIds, State).
+
+%% This is called when there are outstanding confirms or when the
+%% queue is idle and the journal needs syncing (see needs_sync/1).
+
+-spec sync(qistate()) -> qistate().
+
+sync(State = #qistate { journal_handle = undefined }) ->
+ State;
+sync(State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
+-spec needs_sync(qistate()) -> 'confirms' | 'other' | 'false'.
+
+needs_sync(#qistate{journal_handle = undefined}) ->
+ false;
+needs_sync(#qistate{journal_handle = JournalHdl,
+ unconfirmed = UC,
+ unconfirmed_msg = UCM}) ->
+ case gb_sets:is_empty(UC) andalso gb_sets:is_empty(UCM) of
+ true -> case file_handle_cache:needs_sync(JournalHdl) of
+ true -> other;
+ false -> false
+ end;
+ false -> confirms
+ end.
+
+-spec flush(qistate()) -> qistate().
+
+flush(State = #qistate { dirty_count = 0 }) -> State;
+flush(State) -> flush_journal(State).
+
+-spec read(seq_id(), seq_id(), qistate()) ->
+ {[{rabbit_types:msg_id(), seq_id(),
+ rabbit_types:message_properties(),
+ boolean(), boolean()}], qistate()}.
+
+read(StartEnd, StartEnd, State) ->
+ {[], State};
+read(Start, End, State = #qistate { segments = Segments,
+ dir = Dir }) when Start =< End ->
+ %% Start is inclusive, End is exclusive.
+ LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
+ UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
+ {Messages, Segments1} =
+ lists:foldr(fun (Seg, Acc) ->
+ read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
+ end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, State #qistate { segments = Segments1 }}.
+
+-spec next_segment_boundary(seq_id()) -> seq_id().
+
+next_segment_boundary(SeqId) ->
+ {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(Seg + 1, 0).
+
+-spec bounds(qistate()) ->
+ {non_neg_integer(), non_neg_integer(), qistate()}.
+
+bounds(State = #qistate { segments = Segments }) ->
+ %% This is not particularly efficient, but only gets invoked on
+ %% queue initialisation.
+ SegNums = lists:sort(segment_nums(Segments)),
+ %% Don't bother trying to figure out the lowest seq_id, merely the
+ %% seq_id of the start of the lowest segment. That seq_id may not
+ %% actually exist, but that's fine. The important thing is that
+ %% the segment exists and the seq_id reported is on a segment
+ %% boundary.
+ %%
+ %% We also don't really care about the max seq_id. Just start the
+ %% next segment: it makes life much easier.
+ %%
+ %% SegNums is sorted, ascending.
+ {LowSeqId, NextSeqId} =
+ case SegNums of
+ [] -> {0, 0};
+ [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0),
+ reconstruct_seq_id(1 + lists:last(SegNums), 0)}
+ end,
+ {LowSeqId, NextSeqId, State}.
+
+-spec start(rabbit_types:vhost(), [rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}.
+
+start(VHost, DurableQueueNames) ->
+ ok = rabbit_recovery_terms:start(VHost),
+ {DurableTerms, DurableDirectories} =
+ lists:foldl(
+ fun(QName, {RecoveryTerms, ValidDirectories}) ->
+ DirName = queue_name_to_dir_name(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read(VHost, DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ {[RecoveryInfo | RecoveryTerms],
+ sets:add_element(DirName, ValidDirectories)}
+ end, {[], sets:new()}, DurableQueueNames),
+ %% Any queue directory we've not been asked to recover is considered garbage
+ rabbit_file:recursive_delete(
+ [DirName ||
+ DirName <- all_queue_directory_names(VHost),
+ not sets:is_element(filename:basename(DirName), DurableDirectories)]),
+ rabbit_recovery_terms:clear(VHost),
+
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+
+stop(VHost) -> rabbit_recovery_terms:stop(VHost).
+
+all_queue_directory_names(VHost) ->
+ filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_path(VHost),
+ "queues", "*"])).
+
+all_queue_directory_names() ->
+ filelib:wildcard(filename:join([rabbit_vhost:msg_store_dir_wildcard(),
+ "queues", "*"])).
+
+%%----------------------------------------------------------------------------
+%% startup and shutdown
+%%----------------------------------------------------------------------------
+
+erase_index_dir(Dir) ->
+ case rabbit_file:is_dir(Dir) of
+ true -> rabbit_file:recursive_delete([Dir]);
+ false -> ok
+ end.
+
+blank_state(VHostDir, QueueName) ->
+ Dir = queue_dir(VHostDir, QueueName),
+ blank_state_name_dir_funs(QueueName,
+ Dir,
+ fun (_) -> ok end,
+ fun (_) -> ok end).
+
+queue_dir(VHostDir, QueueName) ->
+ %% Queue directory is
+ %% {node_database_dir}/msg_stores/vhosts/{vhost}/queues/{queue}
+ QueueDir = queue_name_to_dir_name(QueueName),
+ filename:join([VHostDir, "queues", QueueDir]).
+
+queue_name_to_dir_name(#resource { kind = queue,
+ virtual_host = VHost,
+ name = QName }) ->
+ <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
+ rabbit_misc:format("~.36B", [Num]).
+
+queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
+queues_base_dir() ->
+ rabbit_mnesia:dir().
+
+blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ #qistate { dir = Dir,
+ segments = segments_new(),
+ journal_handle = undefined,
+ dirty_count = 0,
+ max_journal_entries = MaxJournal,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
+ unconfirmed = gb_sets:new(),
+ unconfirmed_msg = gb_sets:new(),
+ pre_publish_cache = [],
+ delivered_cache = [],
+ queue_name = Name }.
+
+init_clean(RecoveredCounts, State) ->
+ %% Load the journal. Since this is a clean recovery this (almost)
+ %% gets us back to where we were on shutdown.
+ State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
+ %% The journal loading only creates records for segments touched
+ %% by the journal, and the counts are based on the journal entries
+ %% only. We need *complete* counts for *all* segments. By an
+ %% amazing coincidence we stored that information on shutdown.
+ Segments1 =
+ lists:foldl(
+ fun ({Seg, UnackedCount}, SegmentsN) ->
+ Segment = segment_find_or_new(Seg, Dir, SegmentsN),
+ segment_store(Segment #segment { unacked = UnackedCount },
+ SegmentsN)
+ end, Segments, RecoveredCounts),
+ %% the counts above include transient messages, which would be the
+ %% wrong thing to return
+ {undefined, undefined, State1 # qistate { segments = Segments1 }}.
+
+init_dirty(CleanShutdown, ContainsCheckFun, State) ->
+ %% Recover the journal completely. This will also load segments
+ %% which have entries in the journal and remove duplicates. The
+ %% counts will correctly reflect the combination of the segment
+ %% and the journal.
+ State1 = #qistate { dir = Dir, segments = Segments } =
+ recover_journal(State),
+ {Segments1, Count, Bytes, DirtyCount} =
+ %% Load each segment in turn and filter out messages that are
+ %% not in the msg_store, by adding acks to the journal. These
+ %% acks only go to the RAM journal as it doesn't matter if we
+ %% lose them. Also mark delivered if not clean shutdown. Also
+ %% find the number of unacked messages. Also accumulate the
+ %% dirty count here, so we can call maybe_flush_journal below
+ %% and avoid unnecessary file system operations.
+ lists:foldl(
+ fun (Seg, {Segments2, CountAcc, BytesAcc, DirtyCount}) ->
+ {{Segment = #segment { unacked = UnackedCount }, Dirty},
+ UnackedBytes} =
+ recover_segment(ContainsCheckFun, CleanShutdown,
+ segment_find_or_new(Seg, Dir, Segments2),
+ State1#qistate.max_journal_entries),
+ {segment_store(Segment, Segments2),
+ CountAcc + UnackedCount,
+ BytesAcc + UnackedBytes, DirtyCount + Dirty}
+ end, {Segments, 0, 0, 0}, all_segment_nums(State1)),
+ State2 = maybe_flush_journal(State1 #qistate { segments = Segments1,
+ dirty_count = DirtyCount }),
+ {Count, Bytes, State2}.
+
+terminate(State = #qistate { journal_handle = JournalHdl,
+ segments = Segments }) ->
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ SegmentCounts =
+ segment_fold(
+ fun (#segment { num = Seg, unacked = UnackedCount }, Acc) ->
+ [{Seg, UnackedCount} | Acc]
+ end, [], Segments),
+ {SegmentCounts, State #qistate { journal_handle = undefined,
+ segments = undefined }}.
+
+recover_segment(ContainsCheckFun, CleanShutdown,
+ Segment = #segment { journal_entries = JEntries }, MaxJournal) ->
+ {SegEntries, UnackedCount} = load_segment(false, Segment),
+ {SegEntries1, UnackedCountDelta} =
+ segment_plus_journal(SegEntries, JEntries),
+ array:sparse_foldl(
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, no_ack},
+ {SegmentAndDirtyCount, Bytes}) ->
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ {recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
+ Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
+ Bytes + case IsPersistent of
+ true -> MsgProps#message_properties.size;
+ false -> 0
+ end}
+ end,
+ {{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
+ SegEntries1).
+
+recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
+ SegmentAndDirtyCount;
+recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
+ %% force to flush the segment
+ {add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
+recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
+ {add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
+recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
+ {add_to_journal(RelSeq, ack,
+ add_to_journal(RelSeq, del, Segment)),
+ DirtyCount + 2}.
+
+%%----------------------------------------------------------------------------
+%% msg store startup delta function
+%%----------------------------------------------------------------------------
+
+queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () -> link(Gatherer),
+ ok = queue_index_walker_reader(QueueName, Gatherer),
+ unlink(Gatherer),
+ ok
+ end)
+ end || QueueName <- DurableQueues],
+ queue_index_walker({next, Gatherer});
+
+queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer),
+ finished;
+ {value, {MsgId, Count}} ->
+ {MsgId, Count, {next, Gatherer}}
+ end.
+
+queue_index_walker_reader(QueueName, Gatherer) ->
+ ok = scan_queue_segments(
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
+ when is_binary(MsgId) ->
+ gatherer:sync_in(Gatherer, {MsgId, 1});
+ (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
+ _IsAcked, Acc) ->
+ Acc
+ end, ok, QueueName),
+ ok = gatherer:finish(Gatherer).
+
+scan_queue_segments(Fun, Acc, #resource{ virtual_host = VHost } = QueueName) ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ scan_queue_segments(Fun, Acc, VHostDir, QueueName).
+
+scan_queue_segments(Fun, Acc, VHostDir, QueueName) ->
+ State = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(blank_state(VHostDir, QueueName)),
+ Result = lists:foldr(
+ fun (Seg, AccN) ->
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
+ IsDelivered, IsAcked}, AccM) ->
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
+ IsPersistent, IsDelivered, IsAcked, AccM)
+ end, AccN, segment_find_or_new(Seg, Dir, Segments))
+ end, Acc, all_segment_nums(State)),
+ {_SegmentCounts, _State} = terminate(State),
+ Result.
+
+%%----------------------------------------------------------------------------
+%% expiry/binary manipulation
+%%----------------------------------------------------------------------------
+
+create_pub_record_body(MsgOrId, #message_properties { expiry = Expiry,
+ size = Size }) ->
+ ExpiryBin = expiry_to_binary(Expiry),
+ case MsgOrId of
+ MsgId when is_binary(MsgId) ->
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, <<>>};
+ #basic_message{id = MsgId} ->
+ MsgBin = term_to_binary(MsgOrId),
+ {<<MsgId/binary, ExpiryBin/binary, Size:?SIZE_BITS>>, MsgBin}
+ end.
+
+expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
+expiry_to_binary(Expiry) -> <<Expiry:?EXPIRY_BITS>>.
+
+parse_pub_record_body(<<MsgIdNum:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Size:?SIZE_BITS>>, MsgBin) ->
+ %% work around for binary data fragmentation. See
+ %% rabbit_msg_file:read_next/2
+ <<MsgId:?MSG_ID_BYTES/binary>> = <<MsgIdNum:?MSG_ID_BITS>>,
+ Props = #message_properties{expiry = case Expiry of
+ ?NO_EXPIRY -> undefined;
+ X -> X
+ end,
+ size = Size},
+ case MsgBin of
+ <<>> -> {MsgId, Props};
+ _ -> Msg = #basic_message{id = MsgId} = binary_to_term(MsgBin),
+ {Msg, Props}
+ end.
+
+%%----------------------------------------------------------------------------
+%% journal manipulation
+%%----------------------------------------------------------------------------
+
+add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
+ segments = Segments,
+ dir = Dir }) ->
+ {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ Segment = segment_find_or_new(Seg, Dir, Segments),
+ Segment1 = add_to_journal(RelSeq, Action, Segment),
+ State #qistate { dirty_count = DCount + 1,
+ segments = segment_store(Segment1, Segments) };
+
+add_to_journal(RelSeq, Action,
+ Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
+ unacked = UnackedCount }) ->
+
+ {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries),
+
+ {JEntries1, EToSeg1} =
+ case Fun of
+ set ->
+ {array:set(RelSeq, Entry, JEntries),
+ array:set(RelSeq, entry_to_segment(RelSeq, Entry, []),
+ EToSeg)};
+ reset ->
+ {array:reset(RelSeq, JEntries),
+ array:reset(RelSeq, EToSeg)}
+ end,
+
+ Segment #segment {
+ journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
+ unacked = UnackedCount + case Action of
+ ?PUB -> +1;
+ del -> 0;
+ ack -> -1
+ end}.
+
+action_to_entry(RelSeq, Action, JEntries) ->
+ case array:get(RelSeq, JEntries) of
+ undefined ->
+ {set,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end};
+ ({Pub, no_del, no_ack}) when Action == del ->
+ {set, {Pub, del, no_ack}};
+ ({no_pub, del, no_ack}) when Action == ack ->
+ {set, {no_pub, del, ack}};
+ ({?PUB, del, no_ack}) when Action == ack ->
+ {reset, none}
+ end.
+
+maybe_flush_journal(State) ->
+ maybe_flush_journal(infinity, State).
+
+maybe_flush_journal(Hint, State = #qistate { dirty_count = DCount,
+ max_journal_entries = MaxJournal })
+ when DCount > MaxJournal orelse (Hint =/= infinity andalso DCount > Hint) ->
+ flush_journal(State);
+maybe_flush_journal(_Hint, State) ->
+ State.
+
+flush_journal(State = #qistate { segments = Segments }) ->
+ Segments1 =
+ segment_fold(
+ fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
+ case rabbit_file:is_file(Path) of
+ true -> ok = rabbit_file:delete(Path);
+ false -> ok
+ end,
+ SegmentsN;
+ (#segment {} = Segment, SegmentsN) ->
+ segment_store(append_journal_to_segment(Segment), SegmentsN)
+ end, segments_new(), Segments),
+ {JournalHdl, State1} =
+ get_journal_handle(State #qistate { segments = Segments1 }),
+ ok = file_handle_cache:clear(JournalHdl),
+ notify_sync(State1 #qistate { dirty_count = 0 }).
+
+append_journal_to_segment(#segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
+ path = Path } = Segment) ->
+ case array:sparse_size(JEntries) of
+ 0 -> Segment;
+ _ ->
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
+ %% the file_handle_cache also does a list reverse, so this
+ %% might not be required here, but before we were doing a
+ %% sparse_foldr, a lists:reverse/1 seems to be the correct
+ %% thing to do for now.
+ file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))),
+ ok = file_handle_cache:close(Hdl),
+ Segment #segment { journal_entries = array_new(),
+ entries_to_segment = array_new([]) }
+ end.
+
+get_journal_handle(State = #qistate { journal_handle = undefined,
+ dir = Dir,
+ queue_name = Name }) ->
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ ok = rabbit_file:ensure_dir(Path),
+ ok = ensure_queue_name_stub_file(Dir, Name),
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?WRITE_MODE, [{write_buffer, infinity}]),
+ {Hdl, State #qistate { journal_handle = Hdl }};
+get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
+ {Hdl, State}.
+
+%% Loading Journal. This isn't idempotent and will mess up the counts
+%% if you call it more than once on the same state. Assumes the counts
+%% are 0 to start with.
+load_journal(State = #qistate { dir = Dir }) ->
+ Path = filename:join(Dir, ?JOURNAL_FILENAME),
+ case rabbit_file:is_file(Path) of
+ true -> {JournalHdl, State1} = get_journal_handle(State),
+ Size = rabbit_file:file_size(Path),
+ {ok, 0} = file_handle_cache:position(JournalHdl, 0),
+ {ok, JournalBin} = file_handle_cache:read(JournalHdl, Size),
+ parse_journal_entries(JournalBin, State1);
+ false -> State
+ end.
+
+%% ditto
+recover_journal(State) ->
+ State1 = #qistate { segments = Segments } = load_journal(State),
+ Segments1 =
+ segment_map(
+ fun (Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
+ unacked = UnackedCountInJournal }) ->
+ %% We want to keep ack'd entries in so that we can
+ %% remove them if duplicates are in the journal. The
+ %% counts here are purely from the segment itself.
+ {SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
+ {JEntries1, EToSeg1, UnackedCountDuplicates} =
+ journal_minus_segment(JEntries, EToSeg, SegEntries),
+ Segment #segment { journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
+ unacked = (UnackedCountInJournal +
+ UnackedCountInSeg -
+ UnackedCountDuplicates) }
+ end, Segments),
+ State1 #qistate { segments = Segments1 }.
+
+parse_journal_entries(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, del, State));
+
+parse_journal_entries(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>, State) ->
+ parse_journal_entries(Rest, add_to_journal(SeqId, ack, State));
+parse_journal_entries(<<0:?JPREFIX_BITS, 0:?SEQ_BITS,
+ 0:?PUB_RECORD_SIZE_BYTES/unit:8, _/binary>>, State) ->
+ %% Journal entry composed only of zeroes was probably
+ %% produced during a dirty shutdown so stop reading
+ State;
+parse_journal_entries(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS, MsgBin:MsgSize/binary,
+ Rest/binary>>, State) ->
+ IsPersistent = case Prefix of
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end,
+ parse_journal_entries(
+ Rest, add_to_journal(SeqId, {IsPersistent, Bin, MsgBin}, State));
+parse_journal_entries(_ErrOrEoF, State) ->
+ State.
+
+deliver_or_ack(_Kind, [], State) ->
+ State;
+deliver_or_ack(Kind, SeqIds, State) ->
+ JPrefix = case Kind of ack -> ?ACK_JPREFIX; del -> ?DEL_JPREFIX end,
+ {JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ ok = file_handle_cache:append(
+ JournalHdl,
+ [<<JPrefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>> || SeqId <- SeqIds]),
+ maybe_flush_journal(lists:foldl(fun (SeqId, StateN) ->
+ add_to_journal(SeqId, Kind, StateN)
+ end, State1, SeqIds)).
+
+notify_sync(State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun}) ->
+ State1 = case gb_sets:is_empty(UC) of
+ true -> State;
+ false -> OnSyncFun(UC),
+ State#qistate{unconfirmed = gb_sets:new()}
+ end,
+ case gb_sets:is_empty(UCM) of
+ true -> State1;
+ false -> OnSyncMsgFun(UCM),
+ State1#qistate{unconfirmed_msg = gb_sets:new()}
+ end.
+
+%%----------------------------------------------------------------------------
+%% segment manipulation
+%%----------------------------------------------------------------------------
+
+seq_id_to_seg_and_rel_seq_id(SeqId) ->
+ { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }.
+
+reconstruct_seq_id(Seg, RelSeq) ->
+ (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq.
+
+all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
+ lists:sort(
+ sets:to_list(
+ lists:foldl(
+ fun (SegName, Set) ->
+ sets:add_element(
+ list_to_integer(
+ lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
+ SegName)), Set)
+ end, sets:from_list(segment_nums(Segments)),
+ rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))).
+
+segment_find_or_new(Seg, Dir, Segments) ->
+ case segment_find(Seg, Segments) of
+ {ok, Segment} -> Segment;
+ error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
+ Path = filename:join(Dir, SegName),
+ #segment { num = Seg,
+ path = Path,
+ journal_entries = array_new(),
+ entries_to_segment = array_new([]),
+ unacked = 0 }
+ end.
+
+segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
+ {ok, Segment}; %% 1 or (2, matches head)
+segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }]}) ->
+ {ok, Segment}; %% 2, matches tail
+segment_find(Seg, {Segments, _}) -> %% no match
+ maps:find(Seg, Segments).
+
+segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head)
+ {Segments, [#segment { num = Seg } | Tail]}) ->
+ {Segments, [Segment | Tail]};
+segment_store(Segment = #segment { num = Seg }, %% 2, matches tail
+ {Segments, [SegmentA, #segment { num = Seg }]}) ->
+ {Segments, [Segment, SegmentA]};
+segment_store(Segment = #segment { num = Seg }, {Segments, []}) ->
+ {maps:remove(Seg, Segments), [Segment]};
+segment_store(Segment = #segment { num = Seg }, {Segments, [SegmentA]}) ->
+ {maps:remove(Seg, Segments), [Segment, SegmentA]};
+segment_store(Segment = #segment { num = Seg },
+ {Segments, [SegmentA, SegmentB]}) ->
+ {maps:put(SegmentB#segment.num, SegmentB, maps:remove(Seg, Segments)),
+ [Segment, SegmentA]}.
+
+segment_fold(Fun, Acc, {Segments, CachedSegments}) ->
+ maps:fold(fun (_Seg, Segment, Acc1) -> Fun(Segment, Acc1) end,
+ lists:foldl(Fun, Acc, CachedSegments), Segments).
+
+segment_map(Fun, {Segments, CachedSegments}) ->
+ {maps:map(fun (_Seg, Segment) -> Fun(Segment) end, Segments),
+ lists:map(Fun, CachedSegments)}.
+
+segment_nums({Segments, CachedSegments}) ->
+ lists:map(fun (#segment { num = Num }) -> Num end, CachedSegments) ++
+ maps:keys(Segments).
+
+segments_new() ->
+ {#{}, []}.
+
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
+ Initial;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) ->
+ %% NB: we are assembling the segment in reverse order here, so
+ %% del/ack comes first.
+ Buf1 = case {Del, Ack} of
+ {no_del, no_ack} ->
+ Initial;
+ _ ->
+ Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS>>,
+ case {Del, Ack} of
+ {del, ack} -> [[Binary, Binary] | Initial];
+ _ -> [Binary | Initial]
+ end
+ end,
+ case Pub of
+ no_pub ->
+ Buf1;
+ {IsPersistent, Bin, MsgBin} ->
+ [[<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin] | Buf1]
+ end.
+
+read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
+ {Messages, Segments}, Dir) ->
+ Segment = segment_find_or_new(Seg, Dir, Segments),
+ {segment_entries_foldr(
+ fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, IsDelivered, no_ack},
+ Acc)
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ [{MsgOrId, reconstruct_seq_id(StartSeg, RelSeq), MsgProps,
+ IsPersistent, IsDelivered == del} | Acc];
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, Messages, Segment),
+ segment_store(Segment, Segments)}.
+
+segment_entries_foldr(Fun, Init,
+ Segment = #segment { journal_entries = JEntries }) ->
+ {SegEntries, _UnackedCount} = load_segment(false, Segment),
+ {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
+ array:sparse_foldr(
+ fun (RelSeq, {{IsPersistent, Bin, MsgBin}, Del, Ack}, Acc) ->
+ {MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
+ Fun(RelSeq, {{MsgOrId, MsgProps, IsPersistent}, Del, Ack}, Acc)
+ end, Init, SegEntries1).
+
+%% Loading segments
+%%
+%% Does not do any combining with the journal at all.
+load_segment(KeepAcked, #segment { path = Path }) ->
+ Empty = {array_new(), 0},
+ case rabbit_file:is_file(Path) of
+ false -> Empty;
+ true -> Size = rabbit_file:file_size(Path),
+ file_handle_cache_stats:update(queue_index_read),
+ {ok, Hdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?READ_MODE, []),
+ {ok, 0} = file_handle_cache:position(Hdl, bof),
+ {ok, SegBin} = file_handle_cache:read(Hdl, Size),
+ ok = file_handle_cache:close(Hdl),
+ Res = parse_segment_entries(SegBin, KeepAcked, Empty),
+ Res
+ end.
+
+parse_segment_entries(<<?PUB_PREFIX:?PUB_PREFIX_BITS,
+ IsPersistNum:1, RelSeq:?REL_SEQ_BITS, Rest/binary>>,
+ KeepAcked, Acc) ->
+ parse_segment_publish_entry(
+ Rest, 1 == IsPersistNum, RelSeq, KeepAcked, Acc);
+parse_segment_entries(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>, KeepAcked, Acc) ->
+ parse_segment_entries(
+ Rest, KeepAcked, add_segment_relseq_entry(KeepAcked, RelSeq, Acc));
+parse_segment_entries(<<>>, _KeepAcked, Acc) ->
+ Acc.
+
+parse_segment_publish_entry(<<Bin:?PUB_RECORD_BODY_BYTES/binary,
+ MsgSize:?EMBEDDED_SIZE_BITS,
+ MsgBin:MsgSize/binary, Rest/binary>>,
+ IsPersistent, RelSeq, KeepAcked,
+ {SegEntries, Unacked}) ->
+ Obj = {{IsPersistent, Bin, MsgBin}, no_del, no_ack},
+ SegEntries1 = array:set(RelSeq, Obj, SegEntries),
+ parse_segment_entries(Rest, KeepAcked, {SegEntries1, Unacked + 1});
+parse_segment_publish_entry(Rest, _IsPersistent, _RelSeq, KeepAcked, Acc) ->
+ parse_segment_entries(Rest, KeepAcked, Acc).
+
+add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
+ case array:get(RelSeq, SegEntries) of
+ {Pub, no_del, no_ack} ->
+ {array:set(RelSeq, {Pub, del, no_ack}, SegEntries), Unacked};
+ {Pub, del, no_ack} when KeepAcked ->
+ {array:set(RelSeq, {Pub, del, ack}, SegEntries), Unacked - 1};
+ {_Pub, del, no_ack} ->
+ {array:reset(RelSeq, SegEntries), Unacked - 1}
+ end.
+
+array_new() ->
+ array_new(undefined).
+
+array_new(Default) ->
+ array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
+
+bool_to_int(true ) -> 1;
+bool_to_int(false) -> 0.
+
+%%----------------------------------------------------------------------------
+%% journal & segment combination
+%%----------------------------------------------------------------------------
+
+%% Combine what we have just read from a segment file with what we're
+%% holding for that segment in memory. There must be no duplicates.
+segment_plus_journal(SegEntries, JEntries) ->
+ array:sparse_foldl(
+ fun (RelSeq, JObj, {SegEntriesOut, AdditionalUnacked}) ->
+ SegEntry = array:get(RelSeq, SegEntriesOut),
+ {Obj, AdditionalUnackedDelta} =
+ segment_plus_journal1(SegEntry, JObj),
+ {case Obj of
+ undefined -> array:reset(RelSeq, SegEntriesOut);
+ _ -> array:set(RelSeq, Obj, SegEntriesOut)
+ end,
+ AdditionalUnacked + AdditionalUnackedDelta}
+ end, {SegEntries, 0}, JEntries).
+
+%% Here, the result is a tuple with the first element containing the
+%% item which we may be adding to (for items only in the journal),
+%% modifying in (bits in both), or, when returning 'undefined',
+%% erasing from (ack in journal, not segment) the segment array. The
+%% other element of the tuple is the delta for AdditionalUnacked.
+segment_plus_journal1(undefined, {?PUB, no_del, no_ack} = Obj) ->
+ {Obj, 1};
+segment_plus_journal1(undefined, {?PUB, del, no_ack} = Obj) ->
+ {Obj, 1};
+segment_plus_journal1(undefined, {?PUB, del, ack}) ->
+ {undefined, 0};
+
+segment_plus_journal1({?PUB = Pub, no_del, no_ack}, {no_pub, del, no_ack}) ->
+ {{Pub, del, no_ack}, 0};
+segment_plus_journal1({?PUB, no_del, no_ack}, {no_pub, del, ack}) ->
+ {undefined, -1};
+segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
+ {undefined, -1}.
+
+%% Remove from the journal entries for a segment, items that are
+%% duplicates of entries found in the segment itself. Used on start up
+%% to clean up the journal.
+%%
+%% We need to update the entries_to_segment since they are just a
+%% cache of what's on the journal.
+journal_minus_segment(JEntries, EToSeg, SegEntries) ->
+ array:sparse_foldl(
+ fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) ->
+ SegEntry = array:get(RelSeq, SegEntries),
+ {Obj, UnackedRemovedDelta} =
+ journal_minus_segment1(JObj, SegEntry),
+ {JEntriesOut1, EToSegOut1} =
+ case Obj of
+ keep ->
+ {JEntriesOut, EToSegOut};
+ undefined ->
+ {array:reset(RelSeq, JEntriesOut),
+ array:reset(RelSeq, EToSegOut)};
+ _ ->
+ {array:set(RelSeq, Obj, JEntriesOut),
+ array:set(RelSeq, entry_to_segment(RelSeq, Obj, []),
+ EToSegOut)}
+ end,
+ {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta}
+ end, {JEntries, EToSeg, 0}, JEntries).
+
+%% Here, the result is a tuple with the first element containing the
+%% item we are adding to or modifying in the (initially fresh) journal
+%% array. If the item is 'undefined' we leave the journal array
+%% alone. The other element of the tuple is the deltas for
+%% UnackedRemoved.
+
+%% Both the same. Must be at least the publish
+journal_minus_segment1({?PUB, _Del, no_ack} = Obj, Obj) ->
+ {undefined, 1};
+journal_minus_segment1({?PUB, _Del, ack} = Obj, Obj) ->
+ {undefined, 0};
+
+%% Just publish in journal
+journal_minus_segment1({?PUB, no_del, no_ack}, undefined) ->
+ {keep, 0};
+
+%% Publish and deliver in journal
+journal_minus_segment1({?PUB, del, no_ack}, undefined) ->
+ {keep, 0};
+journal_minus_segment1({?PUB = Pub, del, no_ack}, {Pub, no_del, no_ack}) ->
+ {{no_pub, del, no_ack}, 1};
+
+%% Publish, deliver and ack in journal
+journal_minus_segment1({?PUB, del, ack}, undefined) ->
+ {keep, 0};
+journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, no_del, no_ack}) ->
+ {{no_pub, del, ack}, 1};
+journal_minus_segment1({?PUB = Pub, del, ack}, {Pub, del, no_ack}) ->
+ {{no_pub, no_del, ack}, 1};
+
+%% Just deliver in journal
+journal_minus_segment1({no_pub, del, no_ack}, {?PUB, no_del, no_ack}) ->
+ {keep, 0};
+journal_minus_segment1({no_pub, del, no_ack}, {?PUB, del, no_ack}) ->
+ {undefined, 0};
+
+%% Just ack in journal
+journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) ->
+ {keep, 0};
+journal_minus_segment1({no_pub, no_del, ack}, {?PUB, del, ack}) ->
+ {undefined, -1};
+
+%% Deliver and ack in journal
+journal_minus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
+ {keep, 0};
+journal_minus_segment1({no_pub, del, ack}, {?PUB, del, no_ack}) ->
+ {{no_pub, no_del, ack}, 0};
+journal_minus_segment1({no_pub, del, ack}, {?PUB, del, ack}) ->
+ {undefined, -1};
+
+%% Missing segment. If flush_journal/1 is interrupted after deleting
+%% the segment but before truncating the journal we can get these
+%% cases: a delivery and an acknowledgement in the journal, or just an
+%% acknowledgement in the journal, but with no segment. In both cases
+%% we have really forgotten the message; so ignore what's in the
+%% journal.
+journal_minus_segment1({no_pub, no_del, ack}, undefined) ->
+ {undefined, 0};
+journal_minus_segment1({no_pub, del, ack}, undefined) ->
+ {undefined, 0}.
+
+%%----------------------------------------------------------------------------
+%% upgrade
+%%----------------------------------------------------------------------------
+
+-spec add_queue_ttl() -> 'ok'.
+
+add_queue_ttl() ->
+ foreach_queue_index({fun add_queue_ttl_journal/1,
+ fun add_queue_ttl_segment/1}).
+
+add_queue_ttl_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+add_queue_ttl_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BYTES/binary, Rest/binary>>) ->
+ {[<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId,
+ expiry_to_binary(undefined)], Rest};
+add_queue_ttl_journal(_) ->
+ stop.
+
+add_queue_ttl_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BYTES/binary,
+ Rest/binary>>) ->
+ {[<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>,
+ MsgId, expiry_to_binary(undefined)], Rest};
+add_queue_ttl_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+add_queue_ttl_segment(_) ->
+ stop.
+
+avoid_zeroes() ->
+ foreach_queue_index({none, fun avoid_zeroes_segment/1}).
+
+avoid_zeroes_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS>>, Rest};
+avoid_zeroes_segment(<<0:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+avoid_zeroes_segment(_) ->
+ stop.
+
+%% At upgrade time we just define every message's size as 0 - that
+%% will save us a load of faff with the message store, and means we
+%% can actually use the clean recovery terms in VQ. It does mean we
+%% don't count message bodies from before the migration, but we can
+%% live with that.
+store_msg_size() ->
+ foreach_queue_index({fun store_msg_size_journal/1,
+ fun store_msg_size_segment/1}).
+
+store_msg_size_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_size_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_journal(_) ->
+ stop.
+
+store_msg_size_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, 0:?SIZE_BITS>>, Rest};
+store_msg_size_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_size_segment(_) ->
+ stop.
+
+store_msg() ->
+ foreach_queue_index({fun store_msg_journal/1,
+ fun store_msg_segment/1}).
+
+store_msg_journal(<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?DEL_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ Rest/binary>>) ->
+ {<<?ACK_JPREFIX:?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Rest};
+store_msg_journal(<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ Rest/binary>>) ->
+ {<<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_journal(_) ->
+ stop.
+
+store_msg_segment(<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1,
+ RelSeq:?REL_SEQ_BITS, MsgId:?MSG_ID_BITS,
+ Expiry:?EXPIRY_BITS, Size:?SIZE_BITS, Rest/binary>>) ->
+ {<<?PUB_PREFIX:?PUB_PREFIX_BITS, IsPersistentNum:1, RelSeq:?REL_SEQ_BITS,
+ MsgId:?MSG_ID_BITS, Expiry:?EXPIRY_BITS, Size:?SIZE_BITS,
+ 0:?EMBEDDED_SIZE_BITS>>, Rest};
+store_msg_segment(<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
+ RelSeq:?REL_SEQ_BITS, Rest/binary>>) ->
+ {<<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>,
+ Rest};
+store_msg_segment(_) ->
+ stop.
+
+
+
+%%----------------------------------------------------------------------------
+%% Migration functions
+%%----------------------------------------------------------------------------
+
+foreach_queue_index(Funs) ->
+ QueueDirNames = all_queue_directory_names(),
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () ->
+ transform_queue(QueueDirName, Gatherer, Funs)
+ end)
+ end || QueueDirName <- QueueDirNames],
+ empty = gatherer:out(Gatherer),
+ ok = gatherer:stop(Gatherer).
+
+transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
+ ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
+ [ok = transform_file(filename:join(Dir, Seg), SegmentFun)
+ || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
+ ok = gatherer:finish(Gatherer).
+
+transform_file(_Path, none) ->
+ ok;
+transform_file(Path, Fun) when is_function(Fun)->
+ PathTmp = Path ++ ".upgrade",
+ case rabbit_file:file_size(Path) of
+ 0 -> ok;
+ Size -> {ok, PathTmpHdl} =
+ file_handle_cache:open_with_absolute_path(
+ PathTmp, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
+
+ {ok, PathHdl} = file_handle_cache:open_with_absolute_path(
+ Path, ?READ_MODE, [{read_buffer, Size}]),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
+
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = rabbit_file:rename(PathTmp, Path)
+ end.
+
+drive_transform_fun(Fun, Hdl, Contents) ->
+ case Fun(Contents) of
+ stop -> ok;
+ {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
+ end.
+
+move_to_per_vhost_stores(#resource{virtual_host = VHost} = QueueName) ->
+ OldQueueDir = filename:join([queues_base_dir(), "queues",
+ queue_name_to_dir_name_legacy(QueueName)]),
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ NewQueueDir = queue_dir(VHostDir, QueueName),
+ rabbit_log_upgrade:info("About to migrate queue directory '~s' to '~s'",
+ [OldQueueDir, NewQueueDir]),
+ case rabbit_file:is_dir(OldQueueDir) of
+ true ->
+ ok = rabbit_file:ensure_dir(NewQueueDir),
+ ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
+ ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
+ false ->
+ Msg = "Queue index directory '~s' not found for ~s~n",
+ Args = [OldQueueDir, rabbit_misc:rs(QueueName)],
+ rabbit_log_upgrade:error(Msg, Args),
+ rabbit_log:error(Msg, Args)
+ end,
+ ok.
+
+ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
+ QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
+ file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
+ "QUEUE: ", QName/binary, "\n">>).
+
+read_global_recovery_terms(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:open_global_table(),
+
+ DurableTerms =
+ lists:foldl(
+ fun(QName, RecoveryTerms) ->
+ DirName = queue_name_to_dir_name_legacy(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ [RecoveryInfo | RecoveryTerms]
+ end, [], DurableQueueNames),
+
+ ok = rabbit_recovery_terms:close_global_table(),
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+cleanup_global_recovery_terms() ->
+ rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
+ rabbit_recovery_terms:delete_global_table(),
+ ok.
+
+
+update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
+ Key = queue_name_to_dir_name(QueueName),
+ rabbit_recovery_terms:store(VHost, Key, Term).