summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-28 18:32:46 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-28 18:32:46 +0100
commitb96bcd7d3dccd320bd2a6b4023495ca53b7c6092 (patch)
treeef757fe6cd4ab40e6bdbf62b20b3942771db8d3a /src
parenta30c1802f985ccfc14aafe568df8e00bd792ab85 (diff)
downloadrabbitmq-server-git-b96bcd7d3dccd320bd2a6b4023495ca53b7c6092.tar.gz
cosmetic changes to shutdown marker code
move it to the right place reorganise constants section
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl189
1 files changed, 95 insertions, 94 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 7051ea0585..42a635a14b 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -54,38 +54,40 @@
-include("rabbit.hrl").
--define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK_TRANSIENT, 255).
--define(WRITE_OK_PERSISTENT, 254).
--define(INTEGER_SIZE_BYTES, 8).
--define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
--define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(FILE_EXTENSION_DETS, ".dets").
--define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
--define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
--define(BATCH_SIZE, 10000).
--define(CACHE_MAX_SIZE, 10485760).
--define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]).
--define(SHUTDOWN_MESSAGE_KEY, shutdown_token).
--define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
- msg_id = infinity_and_beyond,
- is_delivered = never
- }).
-
--define(SERVER, ?MODULE).
-
--define(MAX_READ_FILE_HANDLES, 256).
--define(FILE_SIZE_LIMIT, (256*1024*1024)).
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
+-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
+-define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+-define(FILE_EXTENSION_DETS, ".dets").
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+-define(BATCH_SIZE, 10000).
+-define(CACHE_MAX_SIZE, 10485760).
+-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]).
+-define(MAX_READ_FILE_HANDLES, 256).
+-define(FILE_SIZE_LIMIT, (256*1024*1024)).
+
+
+-define(SHUTDOWN_MESSAGE_KEY, shutdown_token).
+-define(SHUTDOWN_MESSAGE,
+ #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY,
+ msg_id = infinity_and_beyond,
+ is_delivered = never
+ }).
+-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(SERVER, ?MODULE).
+
-record(dqstate,
{msg_location_dets, %% where are messages?
msg_location_ets, %% as above, but for ets version
@@ -862,72 +864,6 @@ msg_to_bin(Msg = #basic_message { content = Content }) ->
bin_to_msg(MsgBin) ->
binary_to_term(MsgBin).
-
-store_safe_shutdown() ->
- ok = rabbit_misc:execute_mnesia_transaction(
- fun() ->
- mnesia:write(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE, write)
- end).
-
-detect_shutdown_state_and_adjust_delivered_flags() ->
- MarkDelivered =
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- case mnesia:read(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE_KEY, read) of
- [?SHUTDOWN_MESSAGE] ->
- mnesia:delete(rabbit_disk_queue,
- ?SHUTDOWN_MESSAGE_KEY, write),
- false;
- [] ->
- true
- end
- end),
- %% if we crash here, then on startup we'll not find the
- %% SHUTDOWN_MESSAGE so will mark everything delivered, which is
- %% the safe thing to do.
- case MarkDelivered of
- true -> mark_messages_delivered();
- false -> ok
- end.
-
-mark_messages_delivered() ->
- mark_message_delivered('$start_of_table').
-
-%% A single huge transaction is a bad idea because of memory
-%% use. Equally, using dirty operations is a bad idea because you
-%% shouldn't do writes when doing mnesia:dirty_next, because the
-%% ordering can change. So we use transactions of bounded
-%% size. However, even this does necessitate restarting between
-%% transactions.
-mark_message_delivered('$end_of_table') ->
- ok;
-mark_message_delivered(_Key) ->
- mark_message_delivered(
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- ok = mnesia:write_lock_table(rabbit_disk_queue),
- mark_message_delivered(mnesia:first(rabbit_disk_queue),
- ?BATCH_SIZE)
- end)).
-
-mark_message_delivered(Key, 0) ->
- Key;
-mark_message_delivered(Key = '$end_of_table', _N) ->
- Key;
-mark_message_delivered(Key, N) ->
- [Obj] = mnesia:read(rabbit_disk_queue, Key, write),
- M = case Obj #dq_msg_loc.is_delivered of
- true -> N;
- false ->
- ok = mnesia:write(rabbit_disk_queue,
- Obj #dq_msg_loc { is_delivered = true },
- write),
- N - 1
- end,
- mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
-
%%----------------------------------------------------------------------------
%% internal functions
%%----------------------------------------------------------------------------
@@ -1569,9 +1505,74 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) ->
end.
%%----------------------------------------------------------------------------
-%% disk recovery
+%% recovery
%%----------------------------------------------------------------------------
+store_safe_shutdown() ->
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:write(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE, write)
+ end).
+
+detect_shutdown_state_and_adjust_delivered_flags() ->
+ MarkDelivered =
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ case mnesia:read(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE_KEY, read) of
+ [?SHUTDOWN_MESSAGE] ->
+ mnesia:delete(rabbit_disk_queue,
+ ?SHUTDOWN_MESSAGE_KEY, write),
+ false;
+ [] ->
+ true
+ end
+ end),
+ %% if we crash here, then on startup we'll not find the
+ %% SHUTDOWN_MESSAGE so will mark everything delivered, which is
+ %% the safe thing to do.
+ case MarkDelivered of
+ true -> mark_messages_delivered();
+ false -> ok
+ end.
+
+mark_messages_delivered() ->
+ mark_message_delivered('$start_of_table').
+
+%% A single huge transaction is a bad idea because of memory
+%% use. Equally, using dirty operations is a bad idea because you
+%% shouldn't do writes when doing mnesia:dirty_next, because the
+%% ordering can change. So we use transactions of bounded
+%% size. However, even this does necessitate restarting between
+%% transactions.
+mark_message_delivered('$end_of_table') ->
+ ok;
+mark_message_delivered(_Key) ->
+ mark_message_delivered(
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ ok = mnesia:write_lock_table(rabbit_disk_queue),
+ mark_message_delivered(mnesia:first(rabbit_disk_queue),
+ ?BATCH_SIZE)
+ end)).
+
+mark_message_delivered(Key, 0) ->
+ Key;
+mark_message_delivered(Key = '$end_of_table', _N) ->
+ Key;
+mark_message_delivered(Key, N) ->
+ [Obj] = mnesia:read(rabbit_disk_queue, Key, write),
+ M = case Obj #dq_msg_loc.is_delivered of
+ true -> N;
+ false ->
+ ok = mnesia:write(rabbit_disk_queue,
+ Obj #dq_msg_loc { is_delivered = true },
+ write),
+ N - 1
+ end,
+ mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M).
+
add_index() ->
case mnesia:add_table_index(rabbit_disk_queue, msg_id) of
{atomic, ok} -> ok;