summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/bpqueue.erl24
-rw-r--r--src/rabbit_msg_file.erl6
-rw-r--r--src/rabbit_msg_store.erl45
-rw-r--r--src/rabbit_msg_store_gc.erl10
-rw-r--r--src/rabbit_queue_index.erl46
-rw-r--r--src/rabbit_variable_queue.erl46
6 files changed, 92 insertions, 85 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl
index 9cd0f23021..3010cb1182 100644
--- a/src/bpqueue.erl
+++ b/src/bpqueue.erl
@@ -48,8 +48,8 @@
-type(bpqueue() :: {non_neg_integer(), queue()}).
-type(prefix() :: any()).
-type(value() :: any()).
--type(result() :: {'empty', bpqueue()} |
- {{'value', prefix(), value()}, bpqueue()}).
+-type(result() :: ({'empty', bpqueue()} |
+ {{'value', prefix(), value()}, bpqueue()})).
-spec(new/0 :: () -> bpqueue()).
-spec(is_empty/1 :: (bpqueue()) -> boolean()).
@@ -63,14 +63,18 @@
-spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B).
-spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()).
-spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]).
--spec(map_fold_filter_l/4 ::
- (fun ((prefix()) -> boolean()),
- fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
- bpqueue()) -> {bpqueue(), B}).
--spec(map_fold_filter_r/4 ::
- (fun ((prefix()) -> boolean()),
- fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B,
- bpqueue()) -> {bpqueue(), B}).
+-spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())),
+ (fun ((value(), B) ->
+ ({prefix(), value(), B} | 'stop'))),
+ B,
+ bpqueue()) ->
+ {bpqueue(), B}).
+-spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())),
+ (fun ((value(), B) ->
+ ({prefix(), value(), B} | 'stop'))),
+ B,
+ bpqueue()) ->
+ {bpqueue(), B}).
-endif.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 46288ccd77..301f4a9f61 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -86,9 +86,9 @@ read(FileHdl, TotalSize) ->
BodyBinSize = Size - ?GUID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- Guid:?GUID_SIZE_BYTES/binary,
- MsgBodyBin:BodyBinSize/binary,
- ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
+ Guid:?GUID_SIZE_BYTES/binary,
+ MsgBodyBin:BodyBinSize/binary,
+ ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
{ok, {Guid, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6bff9ae6b2..c4a9885f92 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -69,8 +69,7 @@
index_module, %% the module for index ops
index_state, %% where are messages?
current_file, %% current file name as number
- current_file_handle, %% current file handle
- %% since the last fsync?
+ current_file_handle, %% current file handle since the last fsync?
file_handle_cache, %% file handle cache
on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
@@ -85,7 +84,7 @@
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
recovered_state %% boolean: did we recover state?
- }).
+ }).
-record(client_msstate,
{ file_handle_cache,
@@ -96,7 +95,7 @@
file_summary_ets,
dedup_cache_ets,
cur_file_cache_ets
- }).
+ }).
-record(file_summary,
{file, valid_total_size, contiguous_top, left, right, file_size,
@@ -119,11 +118,11 @@
-spec(start_link/4 ::
(atom(), file_path(), [binary()] | 'undefined', startup_fun_state()) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+ {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/4 :: (server(), guid(), msg(), client_msstate()) ->
- {'ok', client_msstate()}).
+ {'ok', client_msstate()}).
-spec(read/3 :: (server(), guid(), client_msstate()) ->
- {{'ok', msg()} | 'not_found', client_msstate()}).
+ {{'ok', msg()} | 'not_found', client_msstate()}).
-spec(contains/2 :: (server(), guid()) -> boolean()).
-spec(remove/2 :: (server(), [guid()]) -> 'ok').
-spec(release/2 :: (server(), [guid()]) -> 'ok').
@@ -305,14 +304,14 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[Server, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
-write(Server, Guid, Msg, CState =
- #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+write(Server, Guid, Msg,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = add_to_cache(CurFileCacheEts, Guid, Msg),
{gen_server2:cast(Server, {write, Guid, Msg}), CState}.
-read(Server, Guid, CState =
- #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+read(Server, Guid,
+ CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
case fetch_and_increment_cache(DedupCacheEts, Guid) of
not_found ->
@@ -393,9 +392,10 @@ add_to_cache(CurFileCacheEts, Guid, Msg) ->
end
end.
-client_read1(Server, #msg_location { guid = Guid, file = File } =
- MsgLocation, Defer, CState =
- #client_msstate { file_summary_ets = FileSummaryEts }) ->
+client_read1(Server,
+ #msg_location { guid = Guid, file = File } = MsgLocation,
+ Defer,
+ CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
read(Server, Guid, CState);
@@ -404,7 +404,8 @@ client_read1(Server, #msg_location { guid = Guid, file = File } =
end.
client_read2(_Server, false, undefined,
- #msg_location { guid = Guid, ref_count = RefCount }, Defer,
+ #msg_location { guid = Guid, ref_count = RefCount },
+ Defer,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
dedup_cache_ets = DedupCacheEts }) ->
case ets:lookup(CurFileCacheEts, Guid) of
@@ -421,10 +422,10 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
Defer();
client_read2(Server, false, _Right,
#msg_location { guid = Guid, ref_count = RefCount, file = File },
- Defer, CState =
- #client_msstate { file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ Defer,
+ CState = #client_msstate { file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
%% It's entirely possible that everything we're doing from here on
%% is for the wrong file, or a non-existent file, as a GC may have
%% finished.
@@ -486,7 +487,7 @@ client_read2(Server, false, _Right,
end.
close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } =
- CState) ->
+ CState) ->
Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}),
lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
true = ets:delete(FileHandlesEts, Key),
@@ -559,7 +560,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
recovered_state = Recovered
- },
+ },
ok = count_msg_refs(Recovered, MsgRefDeltaGen, MsgRefDeltaGenInit, State),
FileNames =
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index ca5e2c6ff1..8a275c39d9 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -80,10 +80,12 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) ->
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
-handle_cast({gc, Source, Destination}, State =
- #gcstate { parent = Parent, dir = Dir, index_module = Index,
- index_state = IndexState,
- file_summary_ets = FileSummaryEts }) ->
+handle_cast({gc, Source, Destination},
+ State = #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index,
+ parent = Parent,
+ file_summary_ets = FileSummaryEts }) ->
Reclaimed = rabbit_msg_store:gc(Source, Destination,
{FileSummaryEts, Dir, Index, IndexState}),
ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8d22d36af6..369a52d9e9 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -160,7 +160,7 @@
segments,
journal_handle,
dirty_count
- }).
+ }).
-record(segment,
{ pubs,
@@ -169,7 +169,7 @@
journal_entries,
path,
num
- }).
+ }).
-include("rabbit_msg_store.hrl").
@@ -185,14 +185,14 @@
journal_entries :: array(),
path :: file_path(),
num :: non_neg_integer()
- })).
+ })).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
-type(qistate() :: #qistate { dir :: file_path(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer()
- }).
+ }).
-spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) ->
{'undefined' |
@@ -212,7 +212,7 @@
-spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(prepare_msg_store_seed_funs/1 ::
- ([queue_name()]) ->
+ ([queue_name()]) ->
{{[binary()] | 'undefined', startup_fun_state()},
{[binary()] | 'undefined', startup_fun_state()}}).
@@ -553,7 +553,7 @@ blank_state(QueueName) ->
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0
- }.
+ }.
array_new() ->
array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
@@ -617,7 +617,7 @@ segment_new(Seg, Dir) ->
journal_entries = array_new(),
path = seg_num_to_path(Dir, Seg),
num = Seg
- }.
+ }.
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
@@ -683,15 +683,15 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
{Guid, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- (bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, Guid])
+ (bool_to_int(IsPersistent)):1,
+ RelSeq:?REL_SEQ_BITS>>, Guid])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
ok;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>,
+ RelSeq:?REL_SEQ_BITS>>,
file_handle_cache:append(
Hdl, case {Del, Ack} of
{del, ack} -> [Binary, Binary];
@@ -710,14 +710,14 @@ terminate(StoreShutdown, Terms, State =
_ -> file_handle_cache:close(JournalHdl)
end,
SegTerms = segment_fold(
- fun (Seg, #segment { handle = Hdl, pubs = PubCount,
- acks = AckCount }, SegTermsAcc) ->
- ok = case Hdl of
- undefined -> ok;
- _ -> file_handle_cache:close(Hdl)
- end,
- [{Seg, {PubCount, AckCount}} | SegTermsAcc]
- end, [], Segments),
+ fun (Seg, #segment { handle = Hdl, pubs = PubCount,
+ acks = AckCount }, SegTermsAcc) ->
+ ok = case Hdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(Hdl)
+ end,
+ [{Seg, {PubCount, AckCount}} | SegTermsAcc]
+ end, [], Segments),
case StoreShutdown of
true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir);
false -> ok
@@ -756,13 +756,13 @@ load_segment(KeepAcks,
load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>} ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{AckCount1, SegEntries1} =
deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries),
load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount,
AckCount1);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
{ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
@@ -834,9 +834,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
<<Guid:?GUID_BYTES/binary>> =
<<GuidNum:?GUID_BITS>>,
Publish = {Guid, case Prefix of
- ?PUB_PERSIST_JPREFIX -> true;
- ?PUB_TRANS_JPREFIX -> false
- end},
+ ?PUB_PERSIST_JPREFIX -> true;
+ ?PUB_TRANS_JPREFIX -> false
+ end},
load_journal_entries(
add_to_journal(SeqId, Publish, State));
_ErrOrEoF -> %% err, we've lost at least a publish
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ba493e02a2..6895700cc2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -162,7 +162,7 @@
persistent_count,
transient_threshold,
pending_ack
- }).
+ }).
-record(msg_status,
{ seq_id,
@@ -172,13 +172,13 @@
is_delivered,
msg_on_disk,
index_on_disk
- }).
+ }).
-record(delta,
{ start_seq_id,
count,
end_seq_id %% note the end_seq_id is always >, not >=
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -332,8 +332,8 @@ terminate(State) ->
rabbit_msg_store:client_terminate(MSCStateT),
Terms = [{persistent_ref, PRef}, {transient_ref, TRef},
{persistent_count, PCount}],
- State1 #vqstate { index_state =
- rabbit_queue_index:terminate(Terms, IndexState),
+ State1 #vqstate { index_state = rabbit_queue_index:terminate(
+ Terms, IndexState),
msg_store_clients = undefined }.
%% the only difference between purge and delete is that delete also
@@ -359,7 +359,7 @@ delete_and_terminate(State) ->
delete1(PersistentStore, TransientThreshold, NextSeqId, 0,
DeltaSeqId, IndexState3),
IndexState4
- end,
+ end,
IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2),
rabbit_msg_store:delete_client(PersistentStore, PRef),
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
@@ -432,9 +432,9 @@ fetch(AckRequired, State =
{loaded, State1} -> fetch(AckRequired, State1)
end;
{{value, MsgStatus = #msg_status {
- msg = Msg, guid = Guid, seq_id = SeqId,
- is_persistent = IsPersistent, is_delivered = IsDelivered,
- msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
+ msg = Msg, guid = Guid, seq_id = SeqId,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
+ msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
AckTag = case AckRequired of
@@ -592,8 +592,8 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
persistent_count = PCount }} =
lists:foldl(
fun (SeqId, {SeqIdsAcc, Dict, StateN =
- #vqstate { msg_store_clients = MSCStateN,
- pending_ack = PAN}}) ->
+ #vqstate { msg_store_clients = MSCStateN,
+ pending_ack = PAN }}) ->
PAN1 = dict:erase(SeqId, PAN),
StateN1 = StateN #vqstate { pending_ack = PAN1 },
case dict:find(SeqId, PAN) of
@@ -618,9 +618,9 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) ->
false ->
{SeqIdsAcc, ?TRANSIENT_MSG_STORE}
end,
- {SeqIdsAcc1,
- rabbit_misc:dict_cons(MsgStore, Guid, Dict),
- StateN3}
+ {SeqIdsAcc1,
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict),
+ StateN3}
end
end, {[], dict:new(), State}, AckTags),
IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState),
@@ -644,7 +644,7 @@ set_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate,
target_ram_msg_count = TargetRamMsgCount
- }) ->
+ }) ->
Rate = AvgEgressRate + AvgIngressRate,
TargetRamMsgCount1 =
case DurationTarget of
@@ -819,7 +819,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
is_delivered = IsDelivered,
msg_on_disk = true,
index_on_disk = true
- } | FilteredAcc],
+ } | FilteredAcc],
IndexStateAcc};
false ->
{FilteredAcc, IndexStateAcc}
@@ -898,10 +898,10 @@ should_force_index_to_disk(State =
msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
Self = self(),
Fun = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, fun (StateN) -> tx_commit_post_msg_store(
- IsTransientPubs, Pubs,
- AckTags, Fun, StateN)
- end)
+ Self, fun (StateN) -> tx_commit_post_msg_store(
+ IsTransientPubs, Pubs,
+ AckTags, Fun, StateN)
+ end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
fun () -> rabbit_msg_store:remove(
@@ -1195,9 +1195,9 @@ publish(index, MsgStatus, #vqstate {
store_beta_entry(MsgStatus2, State1);
publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
- #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta, msg_store_clients = MSCState,
- persistent_store = PersistentStore }) ->
+ #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
+ delta = Delta, msg_store_clients = MSCState,
+ persistent_store = PersistentStore }) ->
{MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState),
{#msg_status { index_on_disk = true }, IndexState1} =