summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-28 16:46:21 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-28 16:46:21 +0100
commit65e18b36147518d26faa0e77fa57469c1094b929 (patch)
tree5dd211ff472a88afa060d175ddd15f1e06b17ed5
parent7a98a7066a88073f9b606978b3b748dc427590d9 (diff)
parentf18e91e08b265c952b1d46b9d1b49e8216d3ab47 (diff)
downloadrabbitmq-server-git-65e18b36147518d26faa0e77fa57469c1094b929.tar.gz
merge with default
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl43
4 files changed, 48 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 309fd7176e..83a13f2ccb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -31,7 +31,7 @@
-module(rabbit_amqqueue).
--export([start/0, stop/0, declare/5, delete/3, purge/1]).
+-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
@@ -115,6 +115,9 @@
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_exclusive/1 :: (rabbit_types:amqqueue())
+ -> rabbit_types:ok_or_error2(qlen(),
+ 'not_exclusive')).
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -371,6 +374,9 @@ stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
+delete_exclusive(#amqqueue{ pid = QPid }) ->
+ gen_server2:call(QPid, delete_exclusive, infinity).
+
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4b4153e099..b2519b7aff 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -45,7 +45,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2]).
-import(queue).
-import(erlang).
@@ -667,6 +667,7 @@ prioritise_call(Msg, _From, _State) ->
info -> 9;
{info, _Items} -> 9;
consumers -> 9;
+ delete_exclusive -> 8;
{maybe_run_queue_via_backing_queue, _Fun} -> 6;
_ -> 0
end.
@@ -685,6 +686,10 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
+ #q{q = #amqqueue{exclusive_owner = DownPid}}) -> 8;
+prioritise_info(_Msg, _State) -> 0.
+
handle_call({init, Recover}, From,
State = #q{q = #amqqueue{exclusive_owner = none}}) ->
declare(Recover, From, State);
@@ -857,6 +862,16 @@ handle_call(stat, _From, State = #q{backing_queue = BQ,
active_consumers = ActiveConsumers}) ->
reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+handle_call(delete_exclusive, _From,
+ State = #q{ backing_queue_state = BQS,
+ backing_queue = BQ,
+ q = #amqqueue{exclusive_owner = Owner}
+ }) when Owner =/= none ->
+ {stop, normal, {ok, BQ:len(BQS)}, State};
+
+handle_call(delete_exclusive, _From, State) ->
+ reply({error, not_exclusive}, State);
+
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
IsEmpty = BQ:is_empty(BQS),
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 0a49b94d09..0b8efc8f83 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -81,7 +81,7 @@ handle_call(delete_all, _From, State = #state{queues = Queues}) ->
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
- rabbit_amqqueue:delete(Q, false, false)
+ rabbit_amqqueue:delete_exclusive(Q)
end)
|| {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State}.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 820378a512..6568aa705f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -164,7 +164,9 @@
-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent}
--define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]).
+-define(READ_MODE, [binary, raw, read]).
+-define(READ_AHEAD_MODE, [{read_ahead, ?SEGMENT_TOTAL_SIZE} | ?READ_MODE]).
+-define(WRITE_MODE, [write | ?READ_MODE]).
%%----------------------------------------------------------------------------
@@ -226,8 +228,13 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name, not Recover),
+init(Name, false, _MsgStoreRecovered, _ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
+ false = filelib:is_file(Dir), %% is_file == is file or dir
+ {0, [], State};
+
+init(Name, true, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -366,15 +373,8 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName, EnsureFresh) ->
- StrName = queue_name_to_dir_name(QueueName),
- Dir = filename:join(queues_dir(), StrName),
- ok = case EnsureFresh of
- true -> false = filelib:is_file(Dir), %% is_file == is file or dir
- ok;
- false -> ok
- end,
- ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
+blank_state(QueueName) ->
+ Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -383,17 +383,21 @@ blank_state(QueueName, EnsureFresh) ->
dirty_count = 0,
max_journal_entries = MaxJournal }.
+clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+
detect_clean_shutdown(Dir) ->
- case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of
+ case file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(filename:join(Dir, ?CLEAN_FILENAME)).
+ rabbit_misc:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ CleanFileName = clean_file_name(Dir),
+ ok = filelib:ensure_dir(CleanFileName),
+ rabbit_misc:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -512,7 +516,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName, false)),
+ recover_journal(blank_state(QueueName)),
[ok = segment_entries_foldr(
fun (_RelSeq,
{{Guid, _MsgProps, true}, _IsDelivered, no_ack},
@@ -620,7 +624,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ _ -> {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries),
ok = file_handle_cache:close(Hdl),
@@ -630,7 +634,8 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE],
+ ok = filelib:ensure_dir(Path),
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
@@ -825,7 +830,7 @@ segment_entries_foldr(Fun, Init,
load_segment(KeepAcked, #segment { path = Path }) ->
case filelib:is_file(Path) of
false -> {array_new(), 0};
- true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []),
+ true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0),
ok = file_handle_cache:close(Hdl),