summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bpqueue.erl6
-rw-r--r--src/gatherer.erl14
-rw-r--r--src/rabbit_msg_store.erl44
-rw-r--r--src/rabbit_queue_index.erl24
-rw-r--r--src/rabbit_variable_queue.erl165
5 files changed, 136 insertions, 117 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 7acc969754..9cd0f23021 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -111,7 +111,8 @@ in_q(Prefix, Queue, BPQ = {0, Q}) ->
N -> {N, queue:in({Prefix, Queue}, Q)}
end;
in_q(Prefix, Queue, BPQ) ->
- in_q1({fun queue:in/2, fun queue:out_r/1, fun queue:join/2},
+ in_q1({fun queue:in/2, fun queue:out_r/1,
+ fun queue:join/2},
Prefix, Queue, BPQ).
in_q_r(Prefix, Queue, BPQ = {0, _Q}) ->
@@ -232,7 +233,8 @@ to_list1({Prefix, InnerQ}) ->
map_fold_filter_l(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
{BPQ, Init};
map_fold_filter_l(PFilter, Fun, Init, {N, Q}) ->
- map_fold_filter1({fun queue:out/1, fun queue:in/2, fun in_q/3, fun join/2},
+ map_fold_filter1({fun queue:out/1, fun queue:in/2,
+ fun in_q/3, fun join/2},
N, PFilter, Fun, Init, Q, new()).
map_fold_filter_r(_PFilter, _Fun, Init, BPQ = {0, _Q}) ->
diff --git a/src/gatherer.erl b/src/gatherer.erl
index 8c44388c40..d5b35e9669 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -98,9 +98,9 @@ handle_call({finished, Token}, _From,
false -> {reply, ok, State1, hibernate}
end;
-handle_call(fetch, From, State =
- #gstate { blocking = Blocking, results = Results,
- waiting_on = Tokens }) ->
+handle_call(fetch, From,
+ State = #gstate { waiting_on = Tokens, results = Results,
+ blocking = Blocking }) ->
case queue:out(Results) of
{empty, _Results} ->
case sets:size(Tokens) of
@@ -117,8 +117,8 @@ handle_call(fetch, From, State =
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({produce, Result}, State = #gstate { blocking = Blocking,
- results = Results }) ->
+handle_cast({produce, Result},
+ State = #gstate { blocking = Blocking, results = Results }) ->
{noreply, case queue:out(Blocking) of
{empty, _Blocking} ->
State #gstate { results = queue:in(Result, Results) };
@@ -137,6 +137,6 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, State = #gstate { blocking = Blocking } ) ->
- [gen_server2:reply(Blocked, finished)
- || Blocked <- queue:to_list(Blocking) ],
+ [gen_server2:reply(Blocked, finished) ||
+ Blocked <- queue:to_list(Blocking)],
State.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 7e09f7fa39..6bff9ae6b2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -128,7 +128,8 @@
-spec(remove/2 :: (server(), [guid()]) -> 'ok').
-spec(release/2 :: (server(), [guid()]) -> 'ok').
-spec(sync/3 :: (server(), [guid()], fun (() -> any())) -> 'ok').
--spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok').
+-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
+ 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
@@ -866,7 +867,8 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
true ->
add_to_pending_gc_completion({read, Guid, From}, State);
false ->
- {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts),
+ {Msg, State1} = read_from_disk(MsgLoc, State,
+ DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
State1
end
@@ -1136,7 +1138,8 @@ insert_into_cache(DedupCacheEts, Guid, Msg) ->
%% index
%%----------------------------------------------------------------------------
-index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) ->
+index_lookup(Key, #client_msstate { index_module = Index,
+ index_state = State }) ->
Index:lookup(Key, State);
index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
@@ -1148,8 +1151,8 @@ index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:update(Obj, State).
-index_update_fields(Key, Updates,
- #msstate { index_module = Index, index_state = State }) ->
+index_update_fields(Key, Updates, #msstate { index_module = Index,
+ index_state = State }) ->
Index:update_fields(Key, Updates, State).
index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
@@ -1324,9 +1327,10 @@ build_index(true, _Files, State =
file_size = FileSize, file = File },
{_Offset, State1 = #msstate { sum_valid_data = SumValid,
sum_file_size = SumFileSize }}) ->
- {FileSize, State1 #msstate { sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize,
- current_file = File }}
+ {FileSize, State1 #msstate {
+ sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize,
+ current_file = File }}
end, {0, State}, FileSummaryEts);
build_index(false, Files, State) ->
{ok, Pid} = gatherer:start_link(),
@@ -1361,8 +1365,9 @@ build_index(Gatherer, Left, [],
build_index(Gatherer, Left, [File|Files], State) ->
Child = make_ref(),
ok = gatherer:wait_on(Gatherer, Child),
- ok = worker_pool:submit_async({?MODULE, build_index_worker,
- [Gatherer, Child, State, Left, File, Files]}),
+ ok = worker_pool:submit_async(
+ {?MODULE, build_index_worker,
+ [Gatherer, Child, State, Left, File, Files]}),
build_index(Gatherer, File, Files, State).
build_index_worker(
@@ -1409,12 +1414,13 @@ build_index_worker(
%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------
-maybe_roll_to_new_file(Offset,
- State = #msstate { dir = Dir,
- current_file_handle = CurHdl,
- current_file = CurFile,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts })
+maybe_roll_to_new_file(
+ Offset,
+ State = #msstate { dir = Dir,
+ current_file_handle = CurHdl,
+ current_file = CurFile,
+ file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts })
when Offset >= ?FILE_SIZE_LIMIT ->
State1 = internal_sync(State),
ok = file_handle_cache:close(CurHdl),
@@ -1631,7 +1637,8 @@ combine_files(#file_summary { file = Source,
ok = file_handle_cache:sync(DestinationHdl),
ok = file_handle_cache:delete(TmpHdl)
end,
- {SourceWorkList, SourceValid} = find_unremoved_messages_in_file(Source, State),
+ {SourceWorkList, SourceValid} =
+ find_unremoved_messages_in_file(Source, State),
ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
SourceHdl, DestinationHdl, Destination, State),
%% tidy up
@@ -1700,7 +1707,8 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
{ok, BlockStart1} =
file_handle_cache:position(SourceHdl, BlockStart1),
{ok, BSize1} =
- file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ file_handle_cache:copy(SourceHdl, DestinationHdl,
+ BSize1),
ok = file_handle_cache:sync(DestinationHdl)
end;
{FinalOffsetZ, _BlockStart1, _BlockEnd1} ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d6ef0cb8b9..8d22d36af6 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -195,7 +195,8 @@
}).
-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
- {'undefined' | non_neg_integer(), binary(), binary(), [any()], qistate()}).
+ {'undefined' |
+ non_neg_integer(), binary(), binary(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(terminate_and_erase/1 :: (qistate()) -> qistate()).
-spec(write_published/4 :: (guid(), seq_id(), boolean(), qistate())
@@ -265,15 +266,14 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
{SegEntries, PubCount, AckCount, Segment1} =
load_segment(false, Segment),
Segment2 =
- #segment { pubs = PubCount1, acks = AckCount1 } =
+ #segment { pubs = PubCount1, acks = AckCount1 } =
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack},
+ fun (RelSeq, {{Guid, _IsPersistent}, Del,
+ no_ack},
Segment3) ->
- Segment4 =
- maybe_add_to_journal(
- ContainsCheckFun(Guid),
- CleanShutdown, Del, RelSeq, Segment3),
- Segment4
+ maybe_add_to_journal(
+ ContainsCheckFun(Guid),
+ CleanShutdown, Del, RelSeq, Segment3)
end, Segment1 #segment { pubs = PubCount,
acks = AckCount },
SegEntries),
@@ -485,9 +485,11 @@ queue_index_walker(DurableQueues) when is_list(DurableQueues) ->
queue_index_walker({[], Gatherer}) ->
case gatherer:fetch(Gatherer) of
- finished -> rabbit_misc:unlink_and_capture_exit(Gatherer),
- finished;
- {value, {Guid, Count}} -> {Guid, Count, {[], Gatherer}}
+ finished ->
+ rabbit_misc:unlink_and_capture_exit(Gatherer),
+ finished;
+ {value, {Guid, Count}} ->
+ {Guid, Count, {[], Gatherer}}
end;
queue_index_walker({[QueueName | QueueNames], Gatherer}) ->
Child = make_ref(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 96f5401ab4..ba493e02a2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -208,33 +208,37 @@
end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
- q1 :: queue(),
- q2 :: bpqueue(),
- delta :: delta(),
- q3 :: bpqueue(),
- q4 :: queue(),
- duration_target :: non_neg_integer(),
- target_ram_msg_count :: non_neg_integer(),
- ram_msg_count :: non_neg_integer(),
- ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
- index_state :: any(),
- next_seq_id :: seq_id(),
- out_counter :: non_neg_integer(),
- in_counter :: non_neg_integer(),
- egress_rate :: {{integer(), integer(), integer()}, non_neg_integer()},
- avg_egress_rate :: float(),
- ingress_rate :: {{integer(), integer(), integer()}, non_neg_integer()},
- avg_ingress_rate :: float(),
- rate_timestamp :: {integer(), integer(), integer()},
- len :: non_neg_integer(),
- on_sync :: {[[ack()]], [[guid()]], [fun (() -> any())]},
- msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}},
- persistent_store :: pid() | atom(),
- persistent_count :: non_neg_integer(),
- transient_threshold :: non_neg_integer(),
- pending_ack :: dict()
- }).
+ q1 :: queue(),
+ q2 :: bpqueue(),
+ delta :: delta(),
+ q3 :: bpqueue(),
+ q4 :: queue(),
+ duration_target :: non_neg_integer(),
+ target_ram_msg_count :: non_neg_integer(),
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_index_count :: non_neg_integer(),
+ index_state :: any(),
+ next_seq_id :: seq_id(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ egress_rate :: {{integer(), integer(), integer()},
+ non_neg_integer()},
+ avg_egress_rate :: float(),
+ ingress_rate :: {{integer(), integer(), integer()},
+ non_neg_integer()},
+ avg_ingress_rate :: float(),
+ rate_timestamp :: {integer(), integer(), integer()},
+ len :: non_neg_integer(),
+ on_sync :: {[[ack()]], [[guid()]],
+ [fun (() -> any())]},
+ msg_store_clients :: 'undefined' | {{any(), binary()},
+ {any(), binary()}},
+ persistent_store :: pid() | atom(),
+ persistent_count :: non_neg_integer(),
+ transient_threshold :: non_neg_integer(),
+ pending_ack :: dict()
+ }).
-include("rabbit_backing_queue_spec.hrl").
@@ -286,34 +290,37 @@ init(QueueName, IsDurable, _Recover) ->
end_seq_id = NextSeqId }
end,
Now = now(),
- State =
- #vqstate { q1 = queue:new(), q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(), q4 = queue:new(),
- duration_target = undefined,
- target_ram_msg_count = undefined,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_index_count = 0,
- index_state = IndexState1,
- next_seq_id = NextSeqId,
- out_counter = 0,
- in_counter = 0,
- egress_rate = {Now, 0},
- avg_egress_rate = 0,
- ingress_rate = {Now, DeltaCount1},
- avg_ingress_rate = 0,
- rate_timestamp = Now,
- len = DeltaCount1,
- on_sync = {[], [], []},
- msg_store_clients = {
- {rabbit_msg_store:client_init(PersistentStore, PRef), PRef},
- {rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), TRef}},
- persistent_store = PersistentStore,
- persistent_count = DeltaCount1,
- transient_threshold = NextSeqId,
- pending_ack = dict:new()
- },
+ PersistentClient = rabbit_msg_store:client_init(PersistentStore, PRef),
+ TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
+ State = #vqstate {
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ duration_target = undefined,
+ target_ram_msg_count = undefined,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_index_count = 0,
+ index_state = IndexState1,
+ next_seq_id = NextSeqId,
+ out_counter = 0,
+ in_counter = 0,
+ egress_rate = {Now, 0},
+ avg_egress_rate = 0,
+ ingress_rate = {Now, DeltaCount1},
+ avg_ingress_rate = 0,
+ rate_timestamp = Now,
+ len = DeltaCount1,
+ on_sync = {[], [], []},
+ msg_store_clients = {{PersistentClient, PRef},
+ {TransientClient, TRef}},
+ persistent_store = PersistentStore,
+ persistent_count = DeltaCount1,
+ transient_threshold = NextSeqId,
+ pending_ack = dict:new()
+ },
maybe_deltas_to_betas(State).
terminate(State) ->
@@ -594,7 +601,8 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
msg_on_disk = false,
is_persistent = false,
msg = Msg }} ->
- {_SeqId, StateN2} = publish(Msg, true, false, StateN1),
+ {_SeqId, StateN2} =
+ publish(Msg, true, false, StateN1),
{SeqIdsAcc, Dict, StateN2};
{ok, {IsPersistent, Guid}} ->
{{ok, Msg = #basic_message{}}, MSCStateN1} =
@@ -889,28 +897,25 @@ should_force_index_to_disk(State =
msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
Self = self(),
- fun() ->
- spawn(
- fun() ->
- ok = rabbit_misc:with_exit_handler(
- fun() -> rabbit_msg_store:remove(
- ?PERSISTENT_MSG_STORE,
- PersistentGuids)
- end,
- fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) ->
- tx_commit_post_msg_store(
- IsTransientPubs, Pubs,
- AckTags, Fun, StateN)
- end)
- end)
- end)
+ Fun = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
+ Self, fun (StateN) -> tx_commit_post_msg_store(
+ IsTransientPubs, Pubs,
+ AckTags, Fun, StateN)
+ end)
+ end,
+ fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
+ fun () -> rabbit_msg_store:remove(
+ ?PERSISTENT_MSG_STORE,
+ PersistentGuids)
+ end,
+ Fun)
+ end)
end.
tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
- #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
- persistent_store = PersistentStore,
- pending_ack = PA }) ->
+ #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
+ persistent_store = PersistentStore,
+ pending_ack = PA }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
%% persistent acks) then we can skip the queue_index loop.
case PersistentStore == ?TRANSIENT_MSG_STORE orelse
@@ -1038,7 +1043,8 @@ remove_queue_entries1(
{PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
- q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
+ q1 = Q1, q2 = Q2,
+ delta = #delta { count = DeltaCount },
q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount,
msg_store_clients = MSCState,
@@ -1419,9 +1425,10 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
q4 = Q4a }
end, Q4, State).
-maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State =
- #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
+maybe_push_alphas_to_betas(
+ _Generator, _Consumer, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
maybe_push_alphas_to_betas(