diff options
-rw-r--r-- | src/rabbit_queue_index.erl | 49 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
2 files changed, 32 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index bd959e553a..a9b46a1482 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,7 +23,7 @@ read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). --export([scan_queue_segments/3]). +-export([persistent_non_acked_message_ids/1]). %% Migrates from global to per-vhost message stores -export([move_to_per_vhost_stores/1, @@ -32,6 +32,7 @@ cleanup_global_recovery_terms/0]). -define(CLEAN_FILENAME, "clean.dot"). +-define(POOL_NAME, ?MODULE). %%---------------------------------------------------------------------------- @@ -706,10 +707,16 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> [begin ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( - fun () -> link(Gatherer), - ok = queue_index_walker_reader(QueueName, Gatherer), - unlink(Gatherer), - ok + fun () -> + link(Gatherer), + MsgIds = persistent_non_acked_message_ids(QueueName), + lists:foreach( + fun (MsgId) -> + gatherer:sync_in(Gatherer, {MsgId, 1}) + end, MsgIds), + ok = gatherer:finish(Gatherer), + unlink(Gatherer), + ok end) end || QueueName <- DurableQueues], queue_index_walker({next, Gatherer}); @@ -723,29 +730,23 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> {MsgId, Count, {next, Gatherer}} end. -queue_index_walker_reader(QueueName, Gatherer) -> - ok = scan_queue_segments( - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) - when is_binary(MsgId) -> - gatherer:sync_in(Gatherer, {MsgId, 1}); - (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, - _IsAcked, Acc) -> - Acc - end, ok, QueueName), - ok = gatherer:finish(Gatherer). +segment_persistent_message_ids(Segment) -> + segment_entries_foldr( + fun (_, {{MsgId, _, true}, _, no_ack}, Acc) + when is_binary(MsgId) -> + [MsgId|Acc]; + (_, _, Acc) -> + Acc + end, [], Segment). -scan_queue_segments(Fun, Acc, QueueName) -> +persistent_non_acked_message_ids(QueueName) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), Result = lists:foldr( - fun (Seg, AccN) -> - segment_entries_foldr( - fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent}, - IsDelivered, IsAcked}, AccM) -> - Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, - IsPersistent, IsDelivered, IsAcked, AccM) - end, AccN, segment_find_or_new(Seg, Dir, Segments)) - end, Acc, all_segment_nums(State)), + fun (SegmentNum, Acc) -> + Segment = segment_find_or_new(SegmentNum, Dir, Segments), + segment_persistent_message_ids(Segment) ++ Acc + end, [], all_segment_nums(State)), {_SegmentCounts, _State} = terminate(State), Result. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 971c11a2a7..00222d1c0b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2893,20 +2893,13 @@ migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, [Name, VHost]), OldStoreClient = get_global_store_client(OldStore), NewStoreClient = get_per_vhost_store_client(QueueName, NewStore), - %% WARNING: During scan_queue_segments queue index state is being recovered - %% and terminated. This can cause side effects! - rabbit_queue_index:scan_queue_segments( - %% We migrate only persistent messages which are found in message store - %% and are not acked yet - fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC) - when is_binary(MsgId) -> - migrate_message(MsgId, OldC, NewStoreClient); - (_SeqId, _MsgId, _MsgProps, - _IsPersistent, _IsDelivered, _IsAcked, OldC) -> - OldC - end, - OldStoreClient, - QueueName), + %% WARNING: During persistent_non_acked_messages queue index state + %% is being recovered and terminated. This can cause side effects! + lists:foldl( + fun (MsgId, OldC) -> + migrate_message(MsgId, OldC, NewStoreClient) + end, OldStoreClient, + rabbit_queue_index:persistent_non_acked_message_ids(QueueName)), rabbit_msg_store:client_terminate(OldStoreClient), rabbit_msg_store:client_terminate(NewStoreClient), NewClientRef = rabbit_msg_store:client_ref(NewStoreClient), |