diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-08-28 18:32:46 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-08-28 18:32:46 +0100 |
| commit | b96bcd7d3dccd320bd2a6b4023495ca53b7c6092 (patch) | |
| tree | ef757fe6cd4ab40e6bdbf62b20b3942771db8d3a /src | |
| parent | a30c1802f985ccfc14aafe568df8e00bd792ab85 (diff) | |
| download | rabbitmq-server-git-b96bcd7d3dccd320bd2a6b4023495ca53b7c6092.tar.gz | |
cosmetic changes to shutdown marker code
move it to the right place
reorganise constants section
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 189 |
1 files changed, 95 insertions, 94 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 7051ea0585..42a635a14b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -54,38 +54,40 @@ -include("rabbit.hrl"). --define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). --define(INTEGER_SIZE_BYTES, 8). --define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). --define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(FILE_EXTENSION_DETS, ".dets"). --define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). --define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs --define(BATCH_SIZE, 10000). --define(CACHE_MAX_SIZE, 10485760). --define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]). --define(SHUTDOWN_MESSAGE_KEY, shutdown_token). --define(SHUTDOWN_MESSAGE, #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, - msg_id = infinity_and_beyond, - is_delivered = never - }). - --define(SERVER, ?MODULE). - --define(MAX_READ_FILE_HANDLES, 256). --define(FILE_SIZE_LIMIT, (256*1024*1024)). +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). +-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). +-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). +-define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). +-define(FILE_EXTENSION_DETS, ".dets"). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). +-define(BATCH_SIZE, 10000). +-define(CACHE_MAX_SIZE, 10485760). +-define(WRITE_HANDLE_OPEN_MODE, [append, raw, binary, delayed_write]). +-define(MAX_READ_FILE_HANDLES, 256). +-define(FILE_SIZE_LIMIT, (256*1024*1024)). + + +-define(SHUTDOWN_MESSAGE_KEY, shutdown_token). +-define(SHUTDOWN_MESSAGE, + #dq_msg_loc { queue_and_seq_id = ?SHUTDOWN_MESSAGE_KEY, + msg_id = infinity_and_beyond, + is_delivered = never + }). +-define(MINIMUM_MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in millisecs -define(SYNC_INTERVAL, 5). %% milliseconds -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(SERVER, ?MODULE). + -record(dqstate, {msg_location_dets, %% where are messages? msg_location_ets, %% as above, but for ets version @@ -862,72 +864,6 @@ msg_to_bin(Msg = #basic_message { content = Content }) -> bin_to_msg(MsgBin) -> binary_to_term(MsgBin). - -store_safe_shutdown() -> - ok = rabbit_misc:execute_mnesia_transaction( - fun() -> - mnesia:write(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE, write) - end). - -detect_shutdown_state_and_adjust_delivered_flags() -> - MarkDelivered = - rabbit_misc:execute_mnesia_transaction( - fun() -> - case mnesia:read(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE_KEY, read) of - [?SHUTDOWN_MESSAGE] -> - mnesia:delete(rabbit_disk_queue, - ?SHUTDOWN_MESSAGE_KEY, write), - false; - [] -> - true - end - end), - %% if we crash here, then on startup we'll not find the - %% SHUTDOWN_MESSAGE so will mark everything delivered, which is - %% the safe thing to do. - case MarkDelivered of - true -> mark_messages_delivered(); - false -> ok - end. - -mark_messages_delivered() -> - mark_message_delivered('$start_of_table'). - -%% A single huge transaction is a bad idea because of memory -%% use. Equally, using dirty operations is a bad idea because you -%% shouldn't do writes when doing mnesia:dirty_next, because the -%% ordering can change. So we use transactions of bounded -%% size. However, even this does necessitate restarting between -%% transactions. -mark_message_delivered('$end_of_table') -> - ok; -mark_message_delivered(_Key) -> - mark_message_delivered( - rabbit_misc:execute_mnesia_transaction( - fun () -> - ok = mnesia:write_lock_table(rabbit_disk_queue), - mark_message_delivered(mnesia:first(rabbit_disk_queue), - ?BATCH_SIZE) - end)). - -mark_message_delivered(Key, 0) -> - Key; -mark_message_delivered(Key = '$end_of_table', _N) -> - Key; -mark_message_delivered(Key, N) -> - [Obj] = mnesia:read(rabbit_disk_queue, Key, write), - M = case Obj #dq_msg_loc.is_delivered of - true -> N; - false -> - ok = mnesia:write(rabbit_disk_queue, - Obj #dq_msg_loc { is_delivered = true }, - write), - N - 1 - end, - mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). - %%---------------------------------------------------------------------------- %% internal functions %%---------------------------------------------------------------------------- @@ -1569,9 +1505,74 @@ delete_empty_files(File, Acc, #dqstate { file_summary = FileSummary }) -> end. %%---------------------------------------------------------------------------- -%% disk recovery +%% recovery %%---------------------------------------------------------------------------- +store_safe_shutdown() -> + ok = rabbit_misc:execute_mnesia_transaction( + fun() -> + mnesia:write(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE, write) + end). + +detect_shutdown_state_and_adjust_delivered_flags() -> + MarkDelivered = + rabbit_misc:execute_mnesia_transaction( + fun() -> + case mnesia:read(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE_KEY, read) of + [?SHUTDOWN_MESSAGE] -> + mnesia:delete(rabbit_disk_queue, + ?SHUTDOWN_MESSAGE_KEY, write), + false; + [] -> + true + end + end), + %% if we crash here, then on startup we'll not find the + %% SHUTDOWN_MESSAGE so will mark everything delivered, which is + %% the safe thing to do. + case MarkDelivered of + true -> mark_messages_delivered(); + false -> ok + end. + +mark_messages_delivered() -> + mark_message_delivered('$start_of_table'). + +%% A single huge transaction is a bad idea because of memory +%% use. Equally, using dirty operations is a bad idea because you +%% shouldn't do writes when doing mnesia:dirty_next, because the +%% ordering can change. So we use transactions of bounded +%% size. However, even this does necessitate restarting between +%% transactions. +mark_message_delivered('$end_of_table') -> + ok; +mark_message_delivered(_Key) -> + mark_message_delivered( + rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = mnesia:write_lock_table(rabbit_disk_queue), + mark_message_delivered(mnesia:first(rabbit_disk_queue), + ?BATCH_SIZE) + end)). + +mark_message_delivered(Key, 0) -> + Key; +mark_message_delivered(Key = '$end_of_table', _N) -> + Key; +mark_message_delivered(Key, N) -> + [Obj] = mnesia:read(rabbit_disk_queue, Key, write), + M = case Obj #dq_msg_loc.is_delivered of + true -> N; + false -> + ok = mnesia:write(rabbit_disk_queue, + Obj #dq_msg_loc { is_delivered = true }, + write), + N - 1 + end, + mark_message_delivered(mnesia:next(rabbit_disk_queue, Key), M). + add_index() -> case mnesia:add_table_index(rabbit_disk_queue, msg_id) of {atomic, ok} -> ok; |
