diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/bpqueue.erl | 6 | ||||
| -rw-r--r-- | src/gatherer.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 165 |
5 files changed, 136 insertions, 117 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl index 7acc969754..9cd0f23021 100644 --- a/src/bpqueue.erl +++ b/src/bpqueue.erl @@ -111,7 +111,8 @@ in_q(Prefix, Queue, BPQ = {0, Q}) -> N -> {N, queue:in({Prefix, Queue}, Q)} end; in_q(Prefix, Queue, BPQ) -> - in_q1({fun queue:in/2, fun queue:out_r/1, fun queue:join/2}, + in_q1({fun queue:in/2, fun queue:out_r/1, + fun queue:join/2}, Prefix, Queue, BPQ). in_q_r(Prefix, Queue, BPQ = {0, _Q}) -> @@ -232,7 +233,8 @@ to_list1({Prefix, InnerQ}) -> map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> {BPQ, Init}; map_fold_filter_l(PFilter, Fun, Init, {N, Q}) -> - map_fold_filter1({fun queue:out/1, fun queue:in/2, fun in_q/3, fun join/2}, + map_fold_filter1({fun queue:out/1, fun queue:in/2, + fun in_q/3, fun join/2}, N, PFilter, Fun, Init, Q, new()). map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) -> diff --git a/src/gatherer.erl b/src/gatherer.erl index 8c44388c40..d5b35e9669 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -98,9 +98,9 @@ handle_call({finished, Token}, _From, false -> {reply, ok, State1, hibernate} end; -handle_call(fetch, From, State = - #gstate { blocking = Blocking, results = Results, - waiting_on = Tokens }) -> +handle_call(fetch, From, + State = #gstate { waiting_on = Tokens, results = Results, + blocking = Blocking }) -> case queue:out(Results) of {empty, _Results} -> case sets:size(Tokens) of @@ -117,8 +117,8 @@ handle_call(fetch, From, State = handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({produce, Result}, State = #gstate { blocking = Blocking, - results = Results }) -> +handle_cast({produce, Result}, + State = #gstate { blocking = Blocking, results = Results }) -> {noreply, case queue:out(Blocking) of {empty, _Blocking} -> State #gstate { results = queue:in(Result, Results) }; @@ -137,6 +137,6 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. terminate(_Reason, State = #gstate { blocking = Blocking } ) -> - [gen_server2:reply(Blocked, finished) - || Blocked <- queue:to_list(Blocking) ], + [gen_server2:reply(Blocked, finished) || + Blocked <- queue:to_list(Blocking)], State. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 7e09f7fa39..6bff9ae6b2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -128,7 +128,8 @@ -spec(remove/2 :: (server(), [guid()]) -> 'ok'). -spec(release/2 :: (server(), [guid()]) -> 'ok'). -spec(sync/3 :: (server(), [guid()], fun (() -> any())) -> 'ok'). --spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> + 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/2 :: (server(), binary()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). @@ -866,7 +867,8 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, true -> add_to_pending_gc_completion({read, Guid, From}, State); false -> - {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), + {Msg, State1} = read_from_disk(MsgLoc, State, + DedupCacheEts), gen_server2:reply(From, {ok, Msg}), State1 end @@ -1136,7 +1138,8 @@ insert_into_cache(DedupCacheEts, Guid, Msg) -> %% index %%---------------------------------------------------------------------------- -index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> +index_lookup(Key, #client_msstate { index_module = Index, + index_state = State }) -> Index:lookup(Key, State); index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> @@ -1148,8 +1151,8 @@ index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> index_update(Obj, #msstate { index_module = Index, index_state = State }) -> Index:update(Obj, State). -index_update_fields(Key, Updates, - #msstate { index_module = Index, index_state = State }) -> +index_update_fields(Key, Updates, #msstate { index_module = Index, + index_state = State }) -> Index:update_fields(Key, Updates, State). index_delete(Key, #msstate { index_module = Index, index_state = State }) -> @@ -1324,9 +1327,10 @@ build_index(true, _Files, State = file_size = FileSize, file = File }, {_Offset, State1 = #msstate { sum_valid_data = SumValid, sum_file_size = SumFileSize }}) -> - {FileSize, State1 #msstate { sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize, - current_file = File }} + {FileSize, State1 #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize, + current_file = File }} end, {0, State}, FileSummaryEts); build_index(false, Files, State) -> {ok, Pid} = gatherer:start_link(), @@ -1361,8 +1365,9 @@ build_index(Gatherer, Left, [], build_index(Gatherer, Left, [File|Files], State) -> Child = make_ref(), ok = gatherer:wait_on(Gatherer, Child), - ok = worker_pool:submit_async({?MODULE, build_index_worker, - [Gatherer, Child, State, Left, File, Files]}), + ok = worker_pool:submit_async( + {?MODULE, build_index_worker, + [Gatherer, Child, State, Left, File, Files]}), build_index(Gatherer, File, Files, State). build_index_worker( @@ -1409,12 +1414,13 @@ build_index_worker( %% garbage collection / compaction / aggregation -- internal %%---------------------------------------------------------------------------- -maybe_roll_to_new_file(Offset, - State = #msstate { dir = Dir, - current_file_handle = CurHdl, - current_file = CurFile, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) +maybe_roll_to_new_file( + Offset, + State = #msstate { dir = Dir, + current_file_handle = CurHdl, + current_file = CurFile, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) when Offset >= ?FILE_SIZE_LIMIT -> State1 = internal_sync(State), ok = file_handle_cache:close(CurHdl), @@ -1631,7 +1637,8 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:sync(DestinationHdl), ok = file_handle_cache:delete(TmpHdl) end, - {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State), + {SourceWorkList, SourceValid} = + find_unremoved_messages_in_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1700,7 +1707,8 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, {ok, BlockStart1} = file_handle_cache:position(SourceHdl, BlockStart1), {ok, BSize1} = - file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + file_handle_cache:copy(SourceHdl, DestinationHdl, + BSize1), ok = file_handle_cache:sync(DestinationHdl) end; {FinalOffsetZ, _BlockStart1, _BlockEnd1} -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6ef0cb8b9..8d22d36af6 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -195,7 +195,8 @@ }). -spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> - {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}). + {'undefined' | + non_neg_integer(), binary(), binary(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(terminate_and_erase/1 :: (qistate()) -> qistate()). -spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate()) @@ -265,15 +266,14 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> {SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment), Segment2 = - #segment { pubs = PubCount1, acks = AckCount1 } = + #segment { pubs = PubCount1, acks = AckCount1 } = array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, + fun (RelSeq, {{Guid, _IsPersistent}, Del, + no_ack}, Segment3) -> - Segment4 = - maybe_add_to_journal( - ContainsCheckFun(Guid), - CleanShutdown, Del, RelSeq, Segment3), - Segment4 + maybe_add_to_journal( + ContainsCheckFun(Guid), + CleanShutdown, Del, RelSeq, Segment3) end, Segment1 #segment { pubs = PubCount, acks = AckCount }, SegEntries), @@ -485,9 +485,11 @@ queue_index_walker(DurableQueues) when is_list(DurableQueues) -> queue_index_walker({[], Gatherer}) -> case gatherer:fetch(Gatherer) of - finished -> rabbit_misc:unlink_and_capture_exit(Gatherer), - finished; - {value, {Guid, Count}} -> {Guid, Count, {[], Gatherer}} + finished -> + rabbit_misc:unlink_and_capture_exit(Gatherer), + finished; + {value, {Guid, Count}} -> + {Guid, Count, {[], Gatherer}} end; queue_index_walker({[QueueName | QueueNames], Gatherer}) -> Child = make_ref(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 96f5401ab4..ba493e02a2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -208,33 +208,37 @@ end_seq_id :: non_neg_integer() }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue(), - delta :: delta(), - q3 :: bpqueue(), - q4 :: queue(), - duration_target :: non_neg_integer(), - target_ram_msg_count :: non_neg_integer(), - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), - index_state :: any(), - next_seq_id :: seq_id(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - egress_rate :: {{integer(), integer(), integer()}, non_neg_integer()}, - avg_egress_rate :: float(), - ingress_rate :: {{integer(), integer(), integer()}, non_neg_integer()}, - avg_ingress_rate :: float(), - rate_timestamp :: {integer(), integer(), integer()}, - len :: non_neg_integer(), - on_sync :: {[[ack()]], [[guid()]], [fun (() -> any())]}, - msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - persistent_store :: pid() | atom(), - persistent_count :: non_neg_integer(), - transient_threshold :: non_neg_integer(), - pending_ack :: dict() - }). + q1 :: queue(), + q2 :: bpqueue(), + delta :: delta(), + q3 :: bpqueue(), + q4 :: queue(), + duration_target :: non_neg_integer(), + target_ram_msg_count :: non_neg_integer(), + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + index_state :: any(), + next_seq_id :: seq_id(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + egress_rate :: {{integer(), integer(), integer()}, + non_neg_integer()}, + avg_egress_rate :: float(), + ingress_rate :: {{integer(), integer(), integer()}, + non_neg_integer()}, + avg_ingress_rate :: float(), + rate_timestamp :: {integer(), integer(), integer()}, + len :: non_neg_integer(), + on_sync :: {[[ack()]], [[guid()]], + [fun (() -> any())]}, + msg_store_clients :: 'undefined' | {{any(), binary()}, + {any(), binary()}}, + persistent_store :: pid() | atom(), + persistent_count :: non_neg_integer(), + transient_threshold :: non_neg_integer(), + pending_ack :: dict() + }). -include("rabbit_backing_queue_spec.hrl"). @@ -286,34 +290,37 @@ init(QueueName, IsDurable, _Recover) -> end_seq_id = NextSeqId } end, Now = now(), - State = - #vqstate { q1 = queue:new(), q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), q4 = queue:new(), - duration_target = undefined, - target_ram_msg_count = undefined, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - index_state = IndexState1, - next_seq_id = NextSeqId, - out_counter = 0, - in_counter = 0, - egress_rate = {Now, 0}, - avg_egress_rate = 0, - ingress_rate = {Now, DeltaCount1}, - avg_ingress_rate = 0, - rate_timestamp = Now, - len = DeltaCount1, - on_sync = {[], [], []}, - msg_store_clients = { - {rabbit_msg_store:client_init(PersistentStore, PRef), PRef}, - {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}}, - persistent_store = PersistentStore, - persistent_count = DeltaCount1, - transient_threshold = NextSeqId, - pending_ack = dict:new() - }, + PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + duration_target = undefined, + target_ram_msg_count = undefined, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + index_state = IndexState1, + next_seq_id = NextSeqId, + out_counter = 0, + in_counter = 0, + egress_rate = {Now, 0}, + avg_egress_rate = 0, + ingress_rate = {Now, DeltaCount1}, + avg_ingress_rate = 0, + rate_timestamp = Now, + len = DeltaCount1, + on_sync = {[], [], []}, + msg_store_clients = {{PersistentClient, PRef}, + {TransientClient, TRef}}, + persistent_store = PersistentStore, + persistent_count = DeltaCount1, + transient_threshold = NextSeqId, + pending_ack = dict:new() + }, maybe_deltas_to_betas(State). terminate(State) -> @@ -594,7 +601,8 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> msg_on_disk = false, is_persistent = false, msg = Msg }} -> - {_SeqId, StateN2} = publish(Msg, true, false, StateN1), + {_SeqId, StateN2} = + publish(Msg, true, false, StateN1), {SeqIdsAcc, Dict, StateN2}; {ok, {IsPersistent, Guid}} -> {{ok, Msg = #basic_message{}}, MSCStateN1} = @@ -889,28 +897,25 @@ should_force_index_to_disk(State = msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> Self = self(), - fun() -> - spawn( - fun() -> - ok = rabbit_misc:with_exit_handler( - fun() -> rabbit_msg_store:remove( - ?PERSISTENT_MSG_STORE, - PersistentGuids) - end, - fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> - tx_commit_post_msg_store( - IsTransientPubs, Pubs, - AckTags, Fun, StateN) - end) - end) - end) + Fun = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( + Self, fun (StateN) -> tx_commit_post_msg_store( + IsTransientPubs, Pubs, + AckTags, Fun, StateN) + end) + end, + fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( + fun () -> rabbit_msg_store:remove( + ?PERSISTENT_MSG_STORE, + PersistentGuids) + end, + Fun) + end) end. tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = - #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, - persistent_store = PersistentStore, - pending_ack = PA }) -> + #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, + persistent_store = PersistentStore, + pending_ack = PA }) -> %% If we are a non-durable queue, or (no persisent pubs, and no %% persistent acks) then we can skip the queue_index loop. case PersistentStore == ?TRANSIENT_MSG_STORE orelse @@ -1038,7 +1043,8 @@ remove_queue_entries1( {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { - q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, + q1 = Q1, q2 = Q2, + delta = #delta { count = DeltaCount }, q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, msg_store_clients = MSCState, @@ -1419,9 +1425,10 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> q4 = Q4a } end, Q4, State). -maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = - #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) +maybe_push_alphas_to_betas( + _Generator, _Consumer, _Q, + State = #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; maybe_push_alphas_to_betas( |
