summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-28 12:49:29 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-28 12:49:29 +0100
commit827ebdeac9073462f405825d3aa4c798121d5062 (patch)
treea5baf2ee1965765e62db51cd95ea0eaedbf04dc7 /src
parent85c7dd830e8a30e7b0b4c52889a896691e9b2fd1 (diff)
parentd6bfe2c30e41c89ea02e899c7313bd5bae8e270f (diff)
downloadrabbitmq-server-git-827ebdeac9073462f405825d3aa4c798121d5062.tar.gz
merge heads - gratuitous shuffling
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl210
1 files changed, 117 insertions, 93 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 78505af793..5d7c7a358d 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -50,6 +50,8 @@
-export([stop/0, stop_and_obliterate/0, set_mode/1, to_disk_only_mode/0,
to_ram_disk_mode/0]).
+%%----------------------------------------------------------------------------
+
-include("rabbit.hrl").
-define(WRITE_OK_SIZE_BITS, 8).
@@ -246,7 +248,7 @@
%% alternating full files and files with only one tiny message in
%% them).
-%% ---- SPECS ----
+%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -286,7 +288,9 @@
-endif.
-%% ---- PUBLIC API ----
+%%----------------------------------------------------------------------------
+%% public API
+%%----------------------------------------------------------------------------
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE,
@@ -360,7 +364,9 @@ cache_info() ->
set_mode(Mode) ->
gen_server2:pcast(?SERVER, 10, {set_mode, Mode}).
-%% ---- GEN-SERVER INTERNAL API ----
+%%----------------------------------------------------------------------------
+%% gen_server behaviour
+%%----------------------------------------------------------------------------
init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% If the gen_server is part of a supervision tree and is ordered
@@ -584,7 +590,9 @@ shutdown(State = #dqstate { msg_location_dets = MsgLocationDets,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%% ---- UTILITY FUNCTIONS ----
+%%----------------------------------------------------------------------------
+%% memory management helper functions
+%%----------------------------------------------------------------------------
stop_memory_timer(State = #dqstate { memory_report_timer_ref = undefined }) ->
State;
@@ -668,37 +676,50 @@ to_ram_disk_mode(State = #dqstate { operation_mode = disk_only,
mnesia_bytes_per_record = undefined,
ets_bytes_per_record = undefined }.
-noreply(NewState) ->
- noreply1(start_memory_timer(NewState)).
+%%----------------------------------------------------------------------------
+%% message cache helper functions
+%%----------------------------------------------------------------------------
-noreply1(NewState = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }) ->
- {noreply, NewState, hibernate};
-noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
- {noreply, start_commit_timer(NewState), 0};
-noreply1(NewState = #dqstate { on_sync_txns = [] }) ->
- {noreply, stop_commit_timer(NewState), hibernate};
-noreply1(NewState) ->
- {noreply, NewState, 0}.
+remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) ->
+ true = ets:delete(Cache, MsgId),
+ ok.
-reply(Reply, NewState) ->
- reply1(Reply, start_memory_timer(NewState)).
+fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ case ets:lookup(Cache, MsgId) of
+ [] ->
+ not_found;
+ [{MsgId, Message, _RefCount}] ->
+ NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
+ {Message, NewRefCount}
+ end.
-reply1(Reply, NewState = #dqstate { on_sync_txns = [],
- commit_timer_ref = undefined }) ->
- {reply, Reply, NewState, hibernate};
-reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
- {reply, Reply, start_commit_timer(NewState), 0};
-reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) ->
- {reply, Reply, stop_commit_timer(NewState), hibernate};
-reply1(Reply, NewState) ->
- {reply, Reply, NewState, 0}.
+decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
+ true = try case ets:update_counter(Cache, MsgId, {3, -1}) of
+ N when N =< 0 -> true = ets:delete(Cache, MsgId);
+ _N -> true
+ end
+ catch error:badarg ->
+ %% MsgId is not in there because although it's been
+ %% delivered, it's never actually been read (think:
+ %% persistent message in mixed queue)
+ true
+ end,
+ ok.
-form_filename(Name) ->
- filename:join(base_directory(), Name).
+insert_into_cache(Message = #basic_message { guid = MsgId },
+ #dqstate { message_cache = Cache }) ->
+ case cache_is_full(Cache) of
+ true -> ok;
+ false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
+ ok
+ end.
-base_directory() ->
- filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
+cache_is_full(Cache) ->
+ ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
+
+%%----------------------------------------------------------------------------
+%% dets/ets agnosticism
+%%----------------------------------------------------------------------------
dets_ets_lookup(#dqstate { msg_location_dets = MsgLocationDets,
operation_mode = disk_only }, Key) ->
@@ -737,6 +758,42 @@ dets_ets_match_object(#dqstate { msg_location_ets = MsgLocationEts,
operation_mode = ram_disk }, Obj) ->
ets:match_object(MsgLocationEts, Obj).
+%%----------------------------------------------------------------------------
+%% general helper functions
+%%----------------------------------------------------------------------------
+
+noreply(NewState) ->
+ noreply1(start_memory_timer(NewState)).
+
+noreply1(NewState = #dqstate { on_sync_txns = [],
+ commit_timer_ref = undefined }) ->
+ {noreply, NewState, hibernate};
+noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {noreply, start_commit_timer(NewState), 0};
+noreply1(NewState = #dqstate { on_sync_txns = [] }) ->
+ {noreply, stop_commit_timer(NewState), hibernate};
+noreply1(NewState) ->
+ {noreply, NewState, 0}.
+
+reply(Reply, NewState) ->
+ reply1(Reply, start_memory_timer(NewState)).
+
+reply1(Reply, NewState = #dqstate { on_sync_txns = [],
+ commit_timer_ref = undefined }) ->
+ {reply, Reply, NewState, hibernate};
+reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
+ {reply, Reply, start_commit_timer(NewState), 0};
+reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) ->
+ {reply, Reply, stop_commit_timer(NewState), hibernate};
+reply1(Reply, NewState) ->
+ {reply, Reply, NewState, 0}.
+
+form_filename(Name) ->
+ filename:join(base_directory(), Name).
+
+base_directory() ->
+ filename:join(rabbit_mnesia:dir(), "rabbit_disk_queue/").
+
with_read_handle_at(File, Offset, Fun, State =
#dqstate { read_file_hc_cache = HC,
current_file_name = CurName,
@@ -752,24 +809,6 @@ with_read_handle_at(File, Offset, Fun, State =
rabbit_file_handle_cache:with_file_handle_at(FilePath, Offset, Fun, HC),
{Result, State1 #dqstate { read_file_hc_cache = HC1 }}.
-sequence_lookup(Sequences, Q) ->
- case ets:lookup(Sequences, Q) of
- [] ->
- {0, 0};
- [{Q, ReadSeqId, WriteSeqId}] ->
- {ReadSeqId, WriteSeqId}
- end.
-
-start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
- State #dqstate { commit_timer_ref = TRef }.
-
-stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
- State;
-stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
- {ok, cancel} = timer:cancel(TRef),
- State #dqstate { commit_timer_ref = undefined }.
-
sync_current_file_handle(State = #dqstate { current_dirty = false,
on_sync_txns = [] }) ->
State;
@@ -788,6 +827,24 @@ sync_current_file_handle(State = #dqstate { current_file_handle = CurHdl,
State1 #dqstate { current_dirty = false, on_sync_txns = [],
last_sync_offset = SyncOffset1 }.
+sequence_lookup(Sequences, Q) ->
+ case ets:lookup(Sequences, Q) of
+ [] ->
+ {0, 0};
+ [{Q, ReadSeqId, WriteSeqId}] ->
+ {ReadSeqId, WriteSeqId}
+ end.
+
+start_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
+ {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, filesync, []),
+ State #dqstate { commit_timer_ref = TRef }.
+
+stop_commit_timer(State = #dqstate { commit_timer_ref = undefined }) ->
+ State;
+stop_commit_timer(State = #dqstate { commit_timer_ref = TRef }) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State #dqstate { commit_timer_ref = undefined }.
+
msg_to_bin(Msg = #basic_message { content = Content }) ->
ClearedContent = rabbit_binary_parser:clear_decoded_content(Content),
term_to_binary(Msg #basic_message { content = ClearedContent }).
@@ -795,44 +852,9 @@ msg_to_bin(Msg = #basic_message { content = Content }) ->
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
-remove_cache_entry(MsgId, #dqstate { message_cache = Cache }) ->
- true = ets:delete(Cache, MsgId),
- ok.
-
-fetch_and_increment_cache(MsgId, #dqstate { message_cache = Cache }) ->
- case ets:lookup(Cache, MsgId) of
- [] ->
- not_found;
- [{MsgId, Message, _RefCount}] ->
- NewRefCount = ets:update_counter(Cache, MsgId, {3, 1}),
- {Message, NewRefCount}
- end.
-
-decrement_cache(MsgId, #dqstate { message_cache = Cache }) ->
- true = try case ets:update_counter(Cache, MsgId, {3, -1}) of
- N when N =< 0 -> true = ets:delete(Cache, MsgId);
- _N -> true
- end
- catch error:badarg ->
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message in mixed queue)
- true
- end,
- ok.
-
-insert_into_cache(Message = #basic_message { guid = MsgId },
- #dqstate { message_cache = Cache }) ->
- case cache_is_full(Cache) of
- true -> ok;
- false -> true = ets:insert_new(Cache, {MsgId, Message, 1}),
- ok
- end.
-
-cache_is_full(Cache) ->
- ets:info(Cache, memory) > ?CACHE_MAX_SIZE.
-
-%% ---- INTERNAL RAW FUNCTIONS ----
+%%----------------------------------------------------------------------------
+%% internal functions
+%%----------------------------------------------------------------------------
internal_fetch_body(Q, MarkDelivered, Advance, State) ->
case queue_head(Q, MarkDelivered, Advance, State) of
@@ -1208,7 +1230,9 @@ internal_delete_non_durable_queues(
end
end, {ok, State}, Sequences).
-%% ---- ROLLING OVER THE APPEND FILE ----
+%%----------------------------------------------------------------------------
+%% garbage collection / compaction / aggregation
+%%----------------------------------------------------------------------------
maybe_roll_to_new_file(Offset,
State = #dqstate { file_size_limit = FileSizeLimit,
@@ -1236,8 +1260,6 @@ maybe_roll_to_new_file(Offset,
maybe_roll_to_new_file(_, State) ->
{ok, State}.
-%% ---- GARBAGE COLLECTION / COMPACTION / AGGREGATION ----
-
compact(FilesSet, State) ->
%% smallest number, hence eldest, hence left-most, first
Files = lists:sort(sets:to_list(FilesSet)),
@@ -1470,7 +1492,9 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
_ -> [File|Acc]
end.
-%% ---- DISK RECOVERY ----
+%%----------------------------------------------------------------------------
+%% disk recovery
+%%----------------------------------------------------------------------------
add_index() ->
case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
@@ -1674,8 +1698,6 @@ load_messages(Left, [File|Files],
{File, ValidTotalSize, ContiguousTop, Left, Right}),
load_messages(File, Files, State).
-%% ---- DISK RECOVERY OF FAILED COMPACTION ----
-
recover_crashed_compactions(Files, TmpFiles) ->
lists:foreach(fun (TmpFile) ->
ok = recover_crashed_compactions1(Files, TmpFile) end,
@@ -1818,7 +1840,9 @@ get_disk_queue_files() ->
DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles),
{DQFilesSorted, DQTFilesSorted}.
-%% ---- RAW READING AND WRITING OF FILES ----
+%%----------------------------------------------------------------------------
+%% raw reading and writing of files
+%%----------------------------------------------------------------------------
append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
BodySize = size(MsgBody),