diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2010-07-02 12:43:43 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2010-07-02 12:43:43 +0100 |
| commit | bc482e7ffcc2cea901419f738ac7cb9db4aa6182 (patch) | |
| tree | f127ba69f71c9a0a91fce2fb673f214f2822361e | |
| parent | e404734a6af2f14fb2ac60ca767ff23c136ea6a1 (diff) | |
| parent | 1a78a06f6105172dc1b10d8e5dfbf666d993bcaa (diff) | |
| download | rabbitmq-server-git-bc482e7ffcc2cea901419f738ac7cb9db4aa6182.tar.gz | |
Merge default into bug22889
| -rw-r--r-- | Makefile | 11 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 122 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 |
9 files changed, 109 insertions, 95 deletions
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(IN SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS) -WEB_URL=http://stage.rabbitmq.com/ +WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml @@ -209,7 +209,7 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ - >> $(TARGET_SRC_DIR)/BUILD + >> $(TARGET_SRC_DIR)/README sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ @@ -230,9 +230,10 @@ distclean: clean # xmlto can not read from standard input, so we mess with a tmp file. %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl - xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ - gzip -f $(DOCS_DIR)/`basename $< .xml` + xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp # Use tmp files rather than a pipeline so that we get meaningful errors diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index bdf407eb93..ce94cafe62 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -11,8 +11,8 @@ rabbit_sup, rabbit_tcp_client_sup]}, {applications, [kernel, stdlib, sasl, mnesia, os_mon]}, -%% we also depend on ssl but it shouldn't be in here as we don't -%% actually want to start it +%% we also depend on crypto, public_key and ssl but they shouldn't be +%% in here as we don't actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, {ssl_listeners, []}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eebcfcb9a9..0aa7445ae6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,7 +37,8 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). @@ -66,10 +67,14 @@ -spec(start/0 :: () -> 'ok'). -spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(), - maybe(pid())) -> amqqueue()). + maybe(pid())) -> {'new' | 'existing', amqqueue()}). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(), + maybe(pid)) -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). @@ -213,6 +218,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -395,7 +425,7 @@ delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, + delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5fdf0ffa90..70e6e75584 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -137,7 +137,7 @@ declare(Recover, From, backing_queue = BQ, backing_queue_state = undefined}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; - Q -> gen_server2:reply(From, Q), + Q -> gen_server2:reply(From, {new, Q}), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), @@ -146,7 +146,7 @@ declare(Recover, From, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), noreply(State#q{backing_queue_state = BQS}); - Q1 -> {stop, normal, Q1, State} + Q1 -> {stop, normal, {existing, Q1}, State} end. terminate_shutdown(Fun, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cd4bfc12b9..1a965f7984 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -284,20 +284,15 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> Reader ! {channel_exit, Channel, Reason}, State#ch{state = terminating}. -return_queue_declare_ok(State, NoWait, Q) -> - NewState = State#ch{most_recently_declared_queue = - (Q#amqqueue.name)#resource.name}, +return_queue_declare_ok(#resource{name = ActualName}, + NoWait, MessageCount, ConsumerCount, State) -> + NewState = State#ch{most_recently_declared_queue = ActualName}, case NoWait of true -> {noreply, NewState}; - false -> - {ok, ActualName, MessageCount, ConsumerCount} = - rabbit_misc:with_exit_handler( - fun () -> {ok, Q#amqqueue.name, 0, 0} end, - fun () -> rabbit_amqqueue:stat(Q) end), - Reply = #'queue.declare_ok'{queue = ActualName#resource.name, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} + false -> Reply = #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} end. check_resource_access(Username, Resource, Perm) -> @@ -329,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> - ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> - ok; -check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). - -with_exclusive_access_or_die(QName, ReaderPid, F) -> - rabbit_amqqueue:with_or_die( - QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -480,7 +462,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -524,7 +506,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% We get the queue process to send the consume_ok on our %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( @@ -716,7 +698,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, - arguments = Args}, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -724,46 +706,40 @@ handle_method(#'queue.declare'{queue = QueueNameBin, true -> ReaderPid; false -> none end, - %% We use this in both branches, because queue_declare may yet return an - %% existing queue. - Finish = fun (#amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q) - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - %% non-equivalence trumps exclusivity arbitrarily - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - precondition_failed, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - Q = case rabbit_amqqueue:with( - rabbit_misc:r(VHostPath, queue, QueueNameBin), - Finish) of - {error, not_found} -> - ActualNameBin = - case QueueNameBin of + ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner)); - #amqqueue{} = Other -> - Other - end, - return_queue_declare_ok(State, NoWait, Q); + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, QueueName, MessageCount, ConsumerCount} -> + return_queue_declare_ok(QueueName, NoWait, MessageCount, + ConsumerCount, State); + {error, not_found} -> + case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + {new, Q = #amqqueue{}} -> + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as + %% the connection shuts down. + ok = case Owner of + none -> ok; + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + {existing, _Q} -> + %% must have been created between the stat and the + %% declare. Loop around again. + handle_method(Declare, none, State) + end + end; handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, @@ -772,8 +748,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - Q = with_exclusive_access_or_die(QueueName, ReaderPid, fun (Q) -> Q end), - return_queue_declare_ok(State, NoWait, Q); + {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + rabbit_amqqueue:with_or_die( + QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), + return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, + State); handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, @@ -782,7 +762,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> @@ -818,7 +798,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = with_exclusive_access_or_die( + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, @@ -926,7 +906,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, - fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of + fun (_X, Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7072055cde..d77bf833ea 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -335,7 +335,7 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> Module = type_to_module(Type), case IsDeleted of auto_deleted -> Module:delete(X, Bs); - no_delete -> Module:remove_bindings(X, Bs) + not_deleted -> Module:remove_bindings(X, Bs) end end, Cleanup) end. @@ -438,11 +438,11 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> end) of Err = {error, _} -> Err; - {{Action, X = #exchange{ type = Type }}, B} -> + {{IsDeleted, X = #exchange{ type = Type }}, B} -> Module = type_to_module(Type), - case Action of - auto_delete -> Module:delete(X, [B]); - no_delete -> Module:remove_bindings(X, [B]) + case IsDeleted of + auto_deleted -> Module:delete(X, [B]); + not_deleted -> Module:remove_bindings(X, [B]) end end. @@ -526,10 +526,10 @@ delete(ExchangeName, IfUnused) -> end. maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> - {no_delete, Exchange}; + {not_deleted, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> case conditional_delete(Exchange) of - {error, in_use} -> {no_delete, Exchange}; + {error, in_use} -> {not_deleted, Exchange}; {deleted, Exchange, []} -> {auto_deleted, Exchange} end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index c3d0b7b786..68ffc98ad7 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -102,7 +102,7 @@ boot_ssl() -> {ok, []} -> ok; {ok, SslListeners} -> - ok = rabbit_misc:start_applications([crypto, ssl]), + ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOpts} = application:get_env(ssl_options), [start_ssl_listener(Host, Port, SslOpts) || {Host, Port} <- SslListeners], ok diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 5cd15a9462..75196bc0d4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -90,13 +90,13 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> - sets:fold( + lists:foldl( fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end - end, [], sets:from_list(Queues)). + end, [], lists:usort(Queues)). %%-------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e74334024e..87eaae9e24 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -792,10 +792,11 @@ test_server_status() -> Writer = spawn(fun () -> receive shutdown -> ok end end), Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, self()), - [Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare( + [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], + {new, Queue = #amqqueue{}} <- + [rabbit_amqqueue:declare( rabbit_misc:r(<<"/">>, queue, Name), - false, false, [], none) || - Name <- [<<"foo">>, <<"bar">>]], + false, false, [], none)]], ok = rabbit_amqqueue:basic_consume(Q, true, Ch, undefined, <<"ctag">>, true, undefined), |
