diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-28 16:46:21 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-28 16:46:21 +0100 |
| commit | 65e18b36147518d26faa0e77fa57469c1094b929 (patch) | |
| tree | 5dd211ff472a88afa060d175ddd15f1e06b17ed5 | |
| parent | 7a98a7066a88073f9b606978b3b748dc427590d9 (diff) | |
| parent | f18e91e08b265c952b1d46b9d1b49e8216d3ab47 (diff) | |
| download | rabbitmq-server-git-65e18b36147518d26faa0e77fa57469c1094b929.tar.gz | |
merge with default
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_queue_collector.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 43 |
4 files changed, 48 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 309fd7176e..83a13f2ccb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, stop/0, declare/5, delete/3, purge/1]). +-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, @@ -115,6 +115,9 @@ (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_exclusive/1 :: (rabbit_types:amqqueue()) + -> rabbit_types:ok_or_error2(qlen(), + 'not_exclusive')). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -371,6 +374,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). +delete_exclusive(#amqqueue{ pid = QPid }) -> + gen_server2:call(QPid, delete_exclusive, infinity). + delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4b4153e099..b2519b7aff 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -45,7 +45,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2]). + prioritise_cast/2, prioritise_info/2]). -import(queue). -import(erlang). @@ -667,6 +667,7 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; + delete_exclusive -> 8; {maybe_run_queue_via_backing_queue, _Fun} -> 6; _ -> 0 end. @@ -685,6 +686,10 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, + #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8; +prioritise_info(_Msg, _State) -> 0. + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -857,6 +862,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ, active_consumers = ActiveConsumers}) -> reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); +handle_call(delete_exclusive, _From, + State = #q{ backing_queue_state = BQS, + backing_queue = BQ, + q = #amqqueue{exclusive_owner = Owner} + }) when Owner =/= none -> + {stop, normal, {ok, BQ:len(BQS)}, State}; + +handle_call(delete_exclusive, _From, State) -> + reply({error, not_exclusive}, State); + handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index 0a49b94d09..0b8efc8f83 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) -> fun () -> ok end, fun () -> erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete(Q, false, false) + rabbit_amqqueue:delete_exclusive(Q) end) || {MonitorRef, Q} <- dict:to_list(Queues)], {reply, ok, State}. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 820378a512..6568aa705f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -164,7 +164,9 @@ -define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} --define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). +-define(READ_MODE, [binary, raw, read]). +-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]). +-define(WRITE_MODE, [write | ?READ_MODE]). %%---------------------------------------------------------------------------- @@ -226,8 +228,13 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name, not Recover), +init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name), + false = filelib:is_file(Dir), %% is_file == is file or dir + {0, [], State}; + +init(Name, true, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 @@ -366,15 +373,8 @@ recover(DurableQueues) -> %% startup and shutdown %%---------------------------------------------------------------------------- -blank_state(QueueName, EnsureFresh) -> - StrName = queue_name_to_dir_name(QueueName), - Dir = filename:join(queues_dir(), StrName), - ok = case EnsureFresh of - true -> false = filelib:is_file(Dir), %% is_file == is file or dir - ok; - false -> ok - end, - ok = filelib:ensure_dir(filename:join(Dir, "nothing")), +blank_state(QueueName) -> + Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -383,17 +383,21 @@ blank_state(QueueName, EnsureFresh) -> dirty_count = 0, max_journal_entries = MaxJournal }. +clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). + detect_clean_shutdown(Dir) -> - case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + case file:delete(clean_file_name(Dir)) of ok -> true; {error, enoent} -> false end. read_shutdown_terms(Dir) -> - rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)). + rabbit_misc:read_term_file(clean_file_name(Dir)). store_clean_shutdown(Terms, Dir) -> - rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). + CleanFileName = clean_file_name(Dir), + ok = filelib:ensure_dir(CleanFileName), + rabbit_misc:write_term_file(CleanFileName, Terms). init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -512,7 +516,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName, false)), + recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, @@ -620,7 +624,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), ok = file_handle_cache:close(Hdl), @@ -630,7 +634,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + ok = filelib:ensure_dir(Path), + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; get_journal_handle(State = #qistate { journal_handle = Hdl }) -> @@ -825,7 +830,7 @@ segment_entries_foldr(Fun, Init, load_segment(KeepAcked, #segment { path = Path }) -> case filelib:is_file(Path) of false -> {array_new(), 0}; - true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), + true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), |
