diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-04 18:16:49 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-04 18:16:49 +0100 |
| commit | 4ea87dc8437d84bba51b8ea81f902549b845d71b (patch) | |
| tree | 9c820d38fc245475d9f377f1e5f4ecd6976d4b0e /src | |
| parent | f78fd8fdf03f7081973bcf39e71276bf48f60a2d (diff) | |
| download | rabbitmq-server-git-4ea87dc8437d84bba51b8ea81f902549b845d71b.tar.gz | |
cosmetic
Diffstat (limited to 'src')
| -rw-r--r-- | src/bpqueue.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
6 files changed, 92 insertions, 85 deletions
diff --git a/src/bpqueue.erl b/src/bpqueue.erl index 9cd0f23021..3010cb1182 100644 --- a/src/bpqueue.erl +++ b/src/bpqueue.erl @@ -48,8 +48,8 @@ -type(bpqueue() :: {non_neg_integer(), queue()}). -type(prefix() :: any()). -type(value() :: any()). --type(result() :: {'empty', bpqueue()} | - {{'value', prefix(), value()}, bpqueue()}). +-type(result() :: ({'empty', bpqueue()} | + {{'value', prefix(), value()}, bpqueue()})). -spec(new/0 :: () -> bpqueue()). -spec(is_empty/1 :: (bpqueue()) -> boolean()). @@ -63,14 +63,18 @@ -spec(foldr/3 :: (fun ((prefix(), value(), B) -> B), B, bpqueue()) -> B). -spec(from_list/1 :: ([{prefix(), [value()]}]) -> bpqueue()). -spec(to_list/1 :: (bpqueue()) -> [{prefix(), [value()]}]). --spec(map_fold_filter_l/4 :: - (fun ((prefix()) -> boolean()), - fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, - bpqueue()) -> {bpqueue(), B}). --spec(map_fold_filter_r/4 :: - (fun ((prefix()) -> boolean()), - fun ((value(), B) -> ({prefix(), value(), B} | 'stop')), B, - bpqueue()) -> {bpqueue(), B}). +-spec(map_fold_filter_l/4 :: ((fun ((prefix()) -> boolean())), + (fun ((value(), B) -> + ({prefix(), value(), B} | 'stop'))), + B, + bpqueue()) -> + {bpqueue(), B}). +-spec(map_fold_filter_r/4 :: ((fun ((prefix()) -> boolean())), + (fun ((value(), B) -> + ({prefix(), value(), B} | 'stop'))), + B, + bpqueue()) -> + {bpqueue(), B}). -endif. diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 46288ccd77..301f4a9f61 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -86,9 +86,9 @@ read(FileHdl, TotalSize) -> BodyBinSize = Size - ?GUID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - Guid:?GUID_SIZE_BYTES/binary, - MsgBodyBin:BodyBinSize/binary, - ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> + Guid:?GUID_SIZE_BYTES/binary, + MsgBodyBin:BodyBinSize/binary, + ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> {ok, {Guid, binary_to_term(MsgBodyBin)}}; KO -> KO end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 6bff9ae6b2..c4a9885f92 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -69,8 +69,7 @@ index_module, %% the module for index ops index_state, %% where are messages? current_file, %% current file name as number - current_file_handle, %% current file handle - %% since the last fsync? + current_file_handle, %% current file handle since the last fsync? file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer @@ -85,7 +84,7 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients recovered_state %% boolean: did we recover state? - }). + }). -record(client_msstate, { file_handle_cache, @@ -96,7 +95,7 @@ file_summary_ets, dedup_cache_ets, cur_file_cache_ets - }). + }). -record(file_summary, {file, valid_total_size, contiguous_top, left, right, file_size, @@ -119,11 +118,11 @@ -spec(start_link/4 :: (atom(), file_path(), [binary()] | 'undefined', startup_fun_state()) -> - {'ok', pid()} | 'ignore' | {'error', any()}). + {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/4 :: (server(), guid(), msg(), client_msstate()) -> - {'ok', client_msstate()}). + {'ok', client_msstate()}). -spec(read/3 :: (server(), guid(), client_msstate()) -> - {{'ok', msg()} | 'not_found', client_msstate()}). + {{'ok', msg()} | 'not_found', client_msstate()}). -spec(contains/2 :: (server(), guid()) -> boolean()). -spec(remove/2 :: (server(), [guid()]) -> 'ok'). -spec(release/2 :: (server(), [guid()]) -> 'ok'). @@ -305,14 +304,14 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [Server, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). -write(Server, Guid, Msg, CState = - #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> +write(Server, Guid, Msg, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = add_to_cache(CurFileCacheEts, Guid, Msg), {gen_server2:cast(Server, {write, Guid, Msg}), CState}. -read(Server, Guid, CState = - #client_msstate { dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> +read(Server, Guid, + CState = #client_msstate { dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache case fetch_and_increment_cache(DedupCacheEts, Guid) of not_found -> @@ -393,9 +392,10 @@ add_to_cache(CurFileCacheEts, Guid, Msg) -> end end. -client_read1(Server, #msg_location { guid = Guid, file = File } = - MsgLocation, Defer, CState = - #client_msstate { file_summary_ets = FileSummaryEts }) -> +client_read1(Server, + #msg_location { guid = Guid, file = File } = MsgLocation, + Defer, + CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. read(Server, Guid, CState); @@ -404,7 +404,8 @@ client_read1(Server, #msg_location { guid = Guid, file = File } = end. client_read2(_Server, false, undefined, - #msg_location { guid = Guid, ref_count = RefCount }, Defer, + #msg_location { guid = Guid, ref_count = RefCount }, + Defer, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, dedup_cache_ets = DedupCacheEts }) -> case ets:lookup(CurFileCacheEts, Guid) of @@ -421,10 +422,10 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> Defer(); client_read2(Server, false, _Right, #msg_location { guid = Guid, ref_count = RefCount, file = File }, - Defer, CState = - #client_msstate { file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> + Defer, + CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> %% It's entirely possible that everything we're doing from here on %% is for the wrong file, or a non-existent file, as a GC may have %% finished. @@ -486,7 +487,7 @@ client_read2(Server, false, _Right, end. close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = - CState) -> + CState) -> Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> true = ets:delete(FileHandlesEts, Key), @@ -559,7 +560,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, recovered_state = Recovered - }, + }, ok = count_msg_refs(Recovered, MsgRefDeltaGen, MsgRefDeltaGenInit, State), FileNames = diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index ca5e2c6ff1..8a275c39d9 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -80,10 +80,12 @@ init([Parent, Dir, IndexState, IndexModule, FileSummaryEts]) -> handle_call(stop, _From, State) -> {stop, normal, ok, State}. -handle_cast({gc, Source, Destination}, State = - #gcstate { parent = Parent, dir = Dir, index_module = Index, - index_state = IndexState, - file_summary_ets = FileSummaryEts }) -> +handle_cast({gc, Source, Destination}, + State = #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index, + parent = Parent, + file_summary_ets = FileSummaryEts }) -> Reclaimed = rabbit_msg_store:gc(Source, Destination, {FileSummaryEts, Dir, Index, IndexState}), ok = rabbit_msg_store:gc_done(Parent, Reclaimed, Source, Destination), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8d22d36af6..369a52d9e9 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -160,7 +160,7 @@ segments, journal_handle, dirty_count - }). + }). -record(segment, { pubs, @@ -169,7 +169,7 @@ journal_entries, path, num - }). + }). -include("rabbit_msg_store.hrl"). @@ -185,14 +185,14 @@ journal_entries :: array(), path :: file_path(), num :: non_neg_integer() - })). + })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). -type(qistate() :: #qistate { dir :: file_path(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer() - }). + }). -spec(init/3 :: (queue_name(), boolean(), fun ((guid()) -> boolean())) -> {'undefined' | @@ -212,7 +212,7 @@ -spec(find_lowest_seq_id_seg_and_next_seq_id/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). -spec(prepare_msg_store_seed_funs/1 :: - ([queue_name()]) -> + ([queue_name()]) -> {{[binary()] | 'undefined', startup_fun_state()}, {[binary()] | 'undefined', startup_fun_state()}}). @@ -553,7 +553,7 @@ blank_state(QueueName) -> segments = segments_new(), journal_handle = undefined, dirty_count = 0 - }. + }. array_new() -> array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). @@ -617,7 +617,7 @@ segment_new(Seg, Dir) -> journal_entries = array_new(), path = seg_num_to_path(Dir, Seg), num = Seg - }. + }. segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of @@ -683,15 +683,15 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> {Guid, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, Guid]) + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, Guid]) end, ok = case {Del, Ack} of {no_del, no_ack} -> ok; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>, + RelSeq:?REL_SEQ_BITS>>, file_handle_cache:append( Hdl, case {Del, Ack} of {del, ack} -> [Binary, Binary]; @@ -710,14 +710,14 @@ terminate(StoreShutdown, Terms, State = _ -> file_handle_cache:close(JournalHdl) end, SegTerms = segment_fold( - fun (Seg, #segment { handle = Hdl, pubs = PubCount, - acks = AckCount }, SegTermsAcc) -> - ok = case Hdl of - undefined -> ok; - _ -> file_handle_cache:close(Hdl) - end, - [{Seg, {PubCount, AckCount}} | SegTermsAcc] - end, [], Segments), + fun (Seg, #segment { handle = Hdl, pubs = PubCount, + acks = AckCount }, SegTermsAcc) -> + ok = case Hdl of + undefined -> ok; + _ -> file_handle_cache:close(Hdl) + end, + [{Seg, {PubCount, AckCount}} | SegTermsAcc] + end, [], Segments), case StoreShutdown of true -> store_clean_shutdown([{segments, SegTerms} | Terms], Dir); false -> ok @@ -756,13 +756,13 @@ load_segment(KeepAcks, load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} -> + RelSeq:?REL_SEQ_BITS>>} -> {AckCount1, SegEntries1} = deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries), load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount, AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), @@ -834,9 +834,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> <<Guid:?GUID_BYTES/binary>> = <<GuidNum:?GUID_BITS>>, Publish = {Guid, case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, load_journal_entries( add_to_journal(SeqId, Publish, State)); _ErrOrEoF -> %% err, we've lost at least a publish diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ba493e02a2..6895700cc2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -162,7 +162,7 @@ persistent_count, transient_threshold, pending_ack - }). + }). -record(msg_status, { seq_id, @@ -172,13 +172,13 @@ is_delivered, msg_on_disk, index_on_disk - }). + }). -record(delta, { start_seq_id, count, end_seq_id %% note the end_seq_id is always >, not >= - }). + }). -record(tx, { pending_messages, pending_acks }). @@ -332,8 +332,8 @@ terminate(State) -> rabbit_msg_store:client_terminate(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], - State1 #vqstate { index_state = - rabbit_queue_index:terminate(Terms, IndexState), + State1 #vqstate { index_state = rabbit_queue_index:terminate( + Terms, IndexState), msg_store_clients = undefined }. %% the only difference between purge and delete is that delete also @@ -359,7 +359,7 @@ delete_and_terminate(State) -> delete1(PersistentStore, TransientThreshold, NextSeqId, 0, DeltaSeqId, IndexState3), IndexState4 - end, + end, IndexState5 = rabbit_queue_index:terminate_and_erase(IndexState2), rabbit_msg_store:delete_client(PersistentStore, PRef), rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), @@ -432,9 +432,9 @@ fetch(AckRequired, State = {loaded, State1} -> fetch(AckRequired, State1) end; {{value, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> AckTag = case AckRequired of @@ -592,8 +592,8 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> persistent_count = PCount }} = lists:foldl( fun (SeqId, {SeqIdsAcc, Dict, StateN = - #vqstate { msg_store_clients = MSCStateN, - pending_ack = PAN}}) -> + #vqstate { msg_store_clients = MSCStateN, + pending_ack = PAN }}) -> PAN1 = dict:erase(SeqId, PAN), StateN1 = StateN #vqstate { pending_ack = PAN1 }, case dict:find(SeqId, PAN) of @@ -618,9 +618,9 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> false -> {SeqIdsAcc, ?TRANSIENT_MSG_STORE} end, - {SeqIdsAcc1, - rabbit_misc:dict_cons(MsgStore, Guid, Dict), - StateN3} + {SeqIdsAcc1, + rabbit_misc:dict_cons(MsgStore, Guid, Dict), + StateN3} end end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), @@ -644,7 +644,7 @@ set_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate, target_ram_msg_count = TargetRamMsgCount - }) -> + }) -> Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = case DurationTarget of @@ -819,7 +819,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = true - } | FilteredAcc], + } | FilteredAcc], IndexStateAcc}; false -> {FilteredAcc, IndexStateAcc} @@ -898,10 +898,10 @@ should_force_index_to_disk(State = msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> Self = self(), Fun = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> tx_commit_post_msg_store( - IsTransientPubs, Pubs, - AckTags, Fun, StateN) - end) + Self, fun (StateN) -> tx_commit_post_msg_store( + IsTransientPubs, Pubs, + AckTags, Fun, StateN) + end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( fun () -> rabbit_msg_store:remove( @@ -1195,9 +1195,9 @@ publish(index, MsgStatus, #vqstate { store_beta_entry(MsgStatus2, State1); publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = - #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta, msg_store_clients = MSCState, - persistent_store = PersistentStore }) -> + #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, + delta = Delta, msg_store_clients = MSCState, + persistent_store = PersistentStore }) -> {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(PersistentStore, true, MsgStatus, MSCState), {#msg_status { index_on_disk = true }, IndexState1} = |
