summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl49
-rw-r--r--src/rabbit_variable_queue.erl21
2 files changed, 32 insertions, 38 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index bd959e553a..a9b46a1482 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,7 +23,7 @@
read/3, next_segment_boundary/1, bounds/1, start/2, stop/1]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
--export([scan_queue_segments/3]).
+-export([persistent_non_acked_message_ids/1]).
%% Migrates from global to per-vhost message stores
-export([move_to_per_vhost_stores/1,
@@ -32,6 +32,7 @@
cleanup_global_recovery_terms/0]).
-define(CLEAN_FILENAME, "clean.dot").
+-define(POOL_NAME, ?MODULE).
%%----------------------------------------------------------------------------
@@ -706,10 +707,16 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- fun () -> link(Gatherer),
- ok = queue_index_walker_reader(QueueName, Gatherer),
- unlink(Gatherer),
- ok
+ fun () ->
+ link(Gatherer),
+ MsgIds = persistent_non_acked_message_ids(QueueName),
+ lists:foreach(
+ fun (MsgId) ->
+ gatherer:sync_in(Gatherer, {MsgId, 1})
+ end, MsgIds),
+ ok = gatherer:finish(Gatherer),
+ unlink(Gatherer),
+ ok
end)
end || QueueName <- DurableQueues],
queue_index_walker({next, Gatherer});
@@ -723,29 +730,23 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
{MsgId, Count, {next, Gatherer}}
end.
-queue_index_walker_reader(QueueName, Gatherer) ->
- ok = scan_queue_segments(
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
- when is_binary(MsgId) ->
- gatherer:sync_in(Gatherer, {MsgId, 1});
- (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
- _IsAcked, Acc) ->
- Acc
- end, ok, QueueName),
- ok = gatherer:finish(Gatherer).
+segment_persistent_message_ids(Segment) ->
+ segment_entries_foldr(
+ fun (_, {{MsgId, _, true}, _, no_ack}, Acc)
+ when is_binary(MsgId) ->
+ [MsgId|Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Segment).
-scan_queue_segments(Fun, Acc, QueueName) ->
+persistent_non_acked_message_ids(QueueName) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
Result = lists:foldr(
- fun (Seg, AccN) ->
- segment_entries_foldr(
- fun (RelSeq, {{MsgOrId, MsgProps, IsPersistent},
- IsDelivered, IsAcked}, AccM) ->
- Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
- IsPersistent, IsDelivered, IsAcked, AccM)
- end, AccN, segment_find_or_new(Seg, Dir, Segments))
- end, Acc, all_segment_nums(State)),
+ fun (SegmentNum, Acc) ->
+ Segment = segment_find_or_new(SegmentNum, Dir, Segments),
+ segment_persistent_message_ids(Segment) ++ Acc
+ end, [], all_segment_nums(State)),
{_SegmentCounts, _State} = terminate(State),
Result.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 971c11a2a7..00222d1c0b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2893,20 +2893,13 @@ migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name},
[Name, VHost]),
OldStoreClient = get_global_store_client(OldStore),
NewStoreClient = get_per_vhost_store_client(QueueName, NewStore),
- %% WARNING: During scan_queue_segments queue index state is being recovered
- %% and terminated. This can cause side effects!
- rabbit_queue_index:scan_queue_segments(
- %% We migrate only persistent messages which are found in message store
- %% and are not acked yet
- fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, OldC)
- when is_binary(MsgId) ->
- migrate_message(MsgId, OldC, NewStoreClient);
- (_SeqId, _MsgId, _MsgProps,
- _IsPersistent, _IsDelivered, _IsAcked, OldC) ->
- OldC
- end,
- OldStoreClient,
- QueueName),
+ %% WARNING: During persistent_non_acked_messages queue index state
+ %% is being recovered and terminated. This can cause side effects!
+ lists:foldl(
+ fun (MsgId, OldC) ->
+ migrate_message(MsgId, OldC, NewStoreClient)
+ end, OldStoreClient,
+ rabbit_queue_index:persistent_non_acked_message_ids(QueueName)),
rabbit_msg_store:client_terminate(OldStoreClient),
rabbit_msg_store:client_terminate(NewStoreClient),
NewClientRef = rabbit_msg_store:client_ref(NewStoreClient),