diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-28 12:49:29 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-28 12:49:29 +0100 |
| commit | 827ebdeac9073462f405825d3aa4c798121d5062 (patch) | |
| tree | a5baf2ee1965765e62db51cd95ea0eaedbf04dc7 /src | |
| parent | 85c7dd830e8a30e7b0b4c52889a896691e9b2fd1 (diff) | |
| parent | d6bfe2c30e41c89ea02e899c7313bd5bae8e270f (diff) | |
| download | rabbitmq-server-git-827ebdeac9073462f405825d3aa4c798121d5062.tar.gz | |
merge heads - gratuitous shuffling
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 210 |
1 files changed, 117 insertions, 93 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 78505af793..5d7c7a358d 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -50,6 +50,8 @@ -export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0, to_ram_disk_mode/0]). +%%---------------------------------------------------------------------------- + -include("rabbit.hrl"). -define(WRITE_OK_SIZE_BITS, 8). @@ -246,7 +248,7 @@ %% alternating full files and files with only one tiny message in %% them). -%% ---- SPECS ---- +%%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -286,7 +288,9 @@ -endif. -%% ---- PUBLIC API ---- +%%---------------------------------------------------------------------------- +%% public API +%%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, @@ -360,7 +364,9 @@ cache_info() -> set_mode(Mode) -> gen_server2:pcast(?SERVER, 10, {set_mode, Mode}). -%% ---- GEN-SERVER INTERNAL API ---- +%%---------------------------------------------------------------------------- +%% gen_server behaviour +%%---------------------------------------------------------------------------- init([FileSizeLimit, ReadFileHandlesLimit]) -> %% If the gen_server is part of a supervision tree and is ordered @@ -584,7 +590,9 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets, code_change(_OldVsn, State, _Extra) -> {ok, State}. -%% ---- UTILITY FUNCTIONS ---- +%%---------------------------------------------------------------------------- +%% memory management helper functions +%%---------------------------------------------------------------------------- stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) -> State; @@ -668,37 +676,50 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only, mnesia_bytes_per_record = undefined, ets_bytes_per_record = undefined }. -noreply(NewState) -> - noreply1(start_memory_timer(NewState)). +%%---------------------------------------------------------------------------- +%% message cache helper functions +%%---------------------------------------------------------------------------- -noreply1(NewState = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }) -> - {noreply, NewState, hibernate}; -noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> - {noreply, start_commit_timer(NewState), 0}; -noreply1(NewState = #dqstate { on_sync_txns = [] }) -> - {noreply, stop_commit_timer(NewState), hibernate}; -noreply1(NewState) -> - {noreply, NewState, 0}. +remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) -> + true = ets:delete(Cache, MsgId), + ok. -reply(Reply, NewState) -> - reply1(Reply, start_memory_timer(NewState)). +fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> + case ets:lookup(Cache, MsgId) of + [] -> + not_found; + [{MsgId, Message, _RefCount}] -> + NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), + {Message, NewRefCount} + end. -reply1(Reply, NewState = #dqstate { on_sync_txns = [], - commit_timer_ref = undefined }) -> - {reply, Reply, NewState, hibernate}; -reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> - {reply, Reply, start_commit_timer(NewState), 0}; -reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) -> - {reply, Reply, stop_commit_timer(NewState), hibernate}; -reply1(Reply, NewState) -> - {reply, Reply, NewState, 0}. +decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> + true = try case ets:update_counter(Cache, MsgId, {3, -1}) of + N when N =< 0 -> true = ets:delete(Cache, MsgId); + _N -> true + end + catch error:badarg -> + %% MsgId is not in there because although it's been + %% delivered, it's never actually been read (think: + %% persistent message in mixed queue) + true + end, + ok. -form_filename(Name) -> - filename:join(base_directory(), Name). +insert_into_cache(Message = #basic_message { guid = MsgId }, + #dqstate { message_cache = Cache }) -> + case cache_is_full(Cache) of + true -> ok; + false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), + ok + end. -base_directory() -> - filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). +cache_is_full(Cache) -> + ets:info(Cache, memory) > ?CACHE_MAX_SIZE. + +%%---------------------------------------------------------------------------- +%% dets/ets agnosticism +%%---------------------------------------------------------------------------- dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets, operation_mode = disk_only }, Key) -> @@ -737,6 +758,42 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts, operation_mode = ram_disk }, Obj) -> ets:match_object(MsgLocationEts, Obj). +%%---------------------------------------------------------------------------- +%% general helper functions +%%---------------------------------------------------------------------------- + +noreply(NewState) -> + noreply1(start_memory_timer(NewState)). + +noreply1(NewState = #dqstate { on_sync_txns = [], + commit_timer_ref = undefined }) -> + {noreply, NewState, hibernate}; +noreply1(NewState = #dqstate { commit_timer_ref = undefined }) -> + {noreply, start_commit_timer(NewState), 0}; +noreply1(NewState = #dqstate { on_sync_txns = [] }) -> + {noreply, stop_commit_timer(NewState), hibernate}; +noreply1(NewState) -> + {noreply, NewState, 0}. + +reply(Reply, NewState) -> + reply1(Reply, start_memory_timer(NewState)). + +reply1(Reply, NewState = #dqstate { on_sync_txns = [], + commit_timer_ref = undefined }) -> + {reply, Reply, NewState, hibernate}; +reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) -> + {reply, Reply, start_commit_timer(NewState), 0}; +reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) -> + {reply, Reply, stop_commit_timer(NewState), hibernate}; +reply1(Reply, NewState) -> + {reply, Reply, NewState, 0}. + +form_filename(Name) -> + filename:join(base_directory(), Name). + +base_directory() -> + filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/"). + with_read_handle_at(File, Offset, Fun, State = #dqstate { read_file_hc_cache = HC, current_file_name = CurName, @@ -752,24 +809,6 @@ with_read_handle_at(File, Offset, Fun, State = rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC), {Result, State1 #dqstate { read_file_hc_cache = HC1 }}. -sequence_lookup(Sequences, Q) -> - case ets:lookup(Sequences, Q) of - [] -> - {0, 0}; - [{Q, ReadSeqId, WriteSeqId}] -> - {ReadSeqId, WriteSeqId} - end. - -start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), - State #dqstate { commit_timer_ref = TRef }. - -stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> - State; -stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> - {ok, cancel} = timer:cancel(TRef), - State #dqstate { commit_timer_ref = undefined }. - sync_current_file_handle(State = #dqstate { current_dirty = false, on_sync_txns = [] }) -> State; @@ -788,6 +827,24 @@ sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl, State1 #dqstate { current_dirty = false, on_sync_txns = [], last_sync_offset = SyncOffset1 }. +sequence_lookup(Sequences, Q) -> + case ets:lookup(Sequences, Q) of + [] -> + {0, 0}; + [{Q, ReadSeqId, WriteSeqId}] -> + {ReadSeqId, WriteSeqId} + end. + +start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []), + State #dqstate { commit_timer_ref = TRef }. + +stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) -> + State; +stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #dqstate { commit_timer_ref = undefined }. + msg_to_bin(Msg = #basic_message { content = Content }) -> ClearedContent = rabbit_binary_parser:clear_decoded_content(Content), term_to_binary(Msg #basic_message { content = ClearedContent }). @@ -795,44 +852,9 @@ msg_to_bin(Msg = #basic_message { content = Content }) -> bin_to_msg(MsgBin) -> binary_to_term(MsgBin). -remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) -> - true = ets:delete(Cache, MsgId), - ok. - -fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) -> - case ets:lookup(Cache, MsgId) of - [] -> - not_found; - [{MsgId, Message, _RefCount}] -> - NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}), - {Message, NewRefCount} - end. - -decrement_cache(MsgId, #dqstate { message_cache = Cache }) -> - true = try case ets:update_counter(Cache, MsgId, {3, -1}) of - N when N =< 0 -> true = ets:delete(Cache, MsgId); - _N -> true - end - catch error:badarg -> - %% MsgId is not in there because although it's been - %% delivered, it's never actually been read (think: - %% persistent message in mixed queue) - true - end, - ok. - -insert_into_cache(Message = #basic_message { guid = MsgId }, - #dqstate { message_cache = Cache }) -> - case cache_is_full(Cache) of - true -> ok; - false -> true = ets:insert_new(Cache, {MsgId, Message, 1}), - ok - end. - -cache_is_full(Cache) -> - ets:info(Cache, memory) > ?CACHE_MAX_SIZE. - -%% ---- INTERNAL RAW FUNCTIONS ---- +%%---------------------------------------------------------------------------- +%% internal functions +%%---------------------------------------------------------------------------- internal_fetch_body(Q, MarkDelivered, Advance, State) -> case queue_head(Q, MarkDelivered, Advance, State) of @@ -1208,7 +1230,9 @@ internal_delete_non_durable_queues( end end, {ok, State}, Sequences). -%% ---- ROLLING OVER THE APPEND FILE ---- +%%---------------------------------------------------------------------------- +%% garbage collection / compaction / aggregation +%%---------------------------------------------------------------------------- maybe_roll_to_new_file(Offset, State = #dqstate { file_size_limit = FileSizeLimit, @@ -1236,8 +1260,6 @@ maybe_roll_to_new_file(Offset, maybe_roll_to_new_file(_, State) -> {ok, State}. -%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ---- - compact(FilesSet, State) -> %% smallest number, hence eldest, hence left-most, first Files = lists:sort(sets:to_list(FilesSet)), @@ -1470,7 +1492,9 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> _ -> [File|Acc] end. -%% ---- DISK RECOVERY ---- +%%---------------------------------------------------------------------------- +%% disk recovery +%%---------------------------------------------------------------------------- add_index() -> case mnesia:add_table_index(rabbit_disk_queue, msg_id) of @@ -1674,8 +1698,6 @@ load_messages(Left, [File|Files], {File, ValidTotalSize, ContiguousTop, Left, Right}), load_messages(File, Files, State). -%% ---- DISK RECOVERY OF FAILED COMPACTION ---- - recover_crashed_compactions(Files, TmpFiles) -> lists:foreach(fun (TmpFile) -> ok = recover_crashed_compactions1(Files, TmpFile) end, @@ -1818,7 +1840,9 @@ get_disk_queue_files() -> DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles), {DQFilesSorted, DQTFilesSorted}. -%% ---- RAW READING AND WRITING OF FILES ---- +%%---------------------------------------------------------------------------- +%% raw reading and writing of files +%%---------------------------------------------------------------------------- append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> BodySize = size(MsgBody), |
