diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-07-05 13:43:56 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-07-05 13:43:56 +0100 |
| commit | 954568f0704c96856c10605b3bb3f68f884ea604 (patch) | |
| tree | 4894a9155434dc23f410bbb7b23ebea24fd56258 | |
| parent | 807937bc528ef7f449cfce310a9ee75988daa5a8 (diff) | |
| parent | cf86b976d4b0be38c973352e1130ca993b7b01d5 (diff) | |
| download | rabbitmq-server-git-954568f0704c96856c10605b3bb3f68f884ea604.tar.gz | |
merge default into bug21954
| -rw-r--r-- | Makefile | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 |
7 files changed, 96 insertions, 90 deletions
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) -WEB_URL=http://stage.rabbitmq.com/ +WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml @@ -202,7 +202,7 @@ srcdist: distclean >> $(TARGET_SRC_DIR)/INSTALL cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ - >> $(TARGET_SRC_DIR)/BUILD + >> $(TARGET_SRC_DIR)/README sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ @@ -223,9 +223,10 @@ distclean: clean # xmlto can not read from standard input, so we mess with a tmp file. %.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl - xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ - xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \ - gzip -f $(DOCS_DIR)/`basename $< .xml` + xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \ + xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \ + xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \ + gzip -f $(DOCS_DIR)/`basename $< .xml` rm -f $<.tmp # Use tmp files rather than a pipeline so that we get meaningful errors diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c4ba9344c6..ffa46642d3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,8 +37,9 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -69,7 +70,6 @@ arguments :: rabbit_framing:amqp_table(), pid :: rabbit:maybe(pid())}). --type(qstats() :: {'ok', name(), non_neg_integer(), non_neg_integer()}). -type(qlen() :: {'ok', non_neg_integer()}). -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit:message()}). @@ -83,6 +83,11 @@ -spec(lookup/1 :: (name()) -> {'ok', amqqueue()} | rabbit_misc:not_found()). -spec(with/2 :: (name(), qfun(A)) -> A | rabbit_misc:not_found()). -spec(with_or_die/2 :: (name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), + rabbit_framing:amqp_table(), rabbit:maybe(pid)) + -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A). -spec(list/1 :: (rabbit:vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [rabbit:info_key()]). -spec(info/1 :: (amqqueue()) -> [rabbit:info()]). @@ -93,8 +98,8 @@ -spec(consumers/1 :: (amqqueue()) -> [{pid(), rabbit:ctag(), boolean()}]). -spec(consumers_all/1 :: (rabbit:vhost()) -> [{name(), pid(), rabbit:ctag(), boolean()}]). --spec(stat/1 :: (amqqueue()) -> qstats()). --spec(stat_all/0 :: () -> [qstats()]). +-spec(stat/1 :: + (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). -spec(delete/3 :: (amqqueue(), 'false', 'false') -> qlen(); (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'}; @@ -228,6 +233,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -262,9 +292,6 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). -stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). - delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). @@ -410,7 +437,7 @@ delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, + delegate:invoke(Pid, fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 70e6e75584..3bf48b4cf8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end end; -handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, - backing_queue = BQ, +handle_call(stat, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS, active_consumers = ActiveConsumers}) -> - reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State); + reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a446bbab01..915a41ee15 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -329,19 +329,6 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> - ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> - ok; -check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). - -with_exclusive_access_or_die(QName, ReaderPid, F) -> - rabbit_amqqueue:with_or_die( - QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -444,12 +431,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of - routed -> - ok; - unroutable -> - ok = basic_return(Message, WriterPid, no_route); - not_delivered -> - ok = basic_return(Message, WriterPid, no_consumers) + routed -> ok; + unroutable -> ok = basic_return(Message, WriterPid, no_route); + not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, {noreply, case TxnKey of none -> State; @@ -480,7 +464,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -499,7 +483,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, Content), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> - {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + {reply, #'basic.get_empty'{}, State} end; handle_method(#'basic.consume'{queue = QueueNameBin, @@ -524,7 +508,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% We get the queue process to send the consume_ok on our %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( @@ -710,13 +694,13 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(Declare = #'queue.declare'{queue = QueueNameBin, - passive = false, - durable = Durable, - exclusive = ExclusiveDeclare, - auto_delete = AutoDelete, - nowait = NoWait, - arguments = Args}, +handle_method(#'queue.declare'{queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, reader_pid = ReaderPid, queue_collector_pid = CollectorPid}) -> @@ -730,18 +714,15 @@ handle_method(Declare = #'queue.declare'{queue = QueueNameBin, end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with(QueueName, - fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of - {{ok, QueueName, MessageCount, ConsumerCount}, - #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q} - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); - {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]); {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of @@ -752,13 +733,13 @@ handle_method(Declare = #'queue.declare'{queue = QueueNameBin, %% the connection shuts down. ok = case Owner of none -> ok; - _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. - handle_method(Declare, undefined, State) + handle_method(Declare, none, State) end end; @@ -769,10 +750,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, reader_pid = ReaderPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), - {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = + {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), - check_exclusive_access(Q, ReaderPid, lax), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -783,7 +764,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> @@ -819,7 +800,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = with_exclusive_access_or_die( + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, @@ -927,7 +908,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, - fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of + fun (_X, Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index b7c6aa96fa..bc1a2a0835 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -76,29 +76,27 @@ mainloop(ChannelPid) -> {method, MethodName, FieldsBin} = read_frame(ChannelPid), Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), case rabbit_framing:method_has_content(MethodName) of - true -> rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, MethodName)); + true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, ClassId)); false -> rabbit_channel:do(ChannelPid, Method) end, ?MODULE:mainloop(ChannelPid). -collect_content(ChannelPid, MethodName) -> - {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), +collect_content(ChannelPid, ClassId) -> case read_frame(ChannelPid) of - {content_header, HeaderClassId, 0, BodySize, PropertiesBin} -> - if HeaderClassId == ClassId -> - Payload = collect_content_payload(ChannelPid, BodySize, []), - #content{class_id = ClassId, - properties = none, - properties_bin = PropertiesBin, - payload_fragments_rev = Payload}; - true -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]) - end; + {content_header, ClassId, 0, BodySize, PropertiesBin} -> + Payload = collect_content_payload(ChannelPid, BodySize, []), + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + payload_fragments_rev = Payload}; + {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> + rabbit_misc:protocol_error( + command_invalid, + "expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> rabbit_misc:protocol_error( command_invalid, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a83adcd45e..81798c5ace 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -678,11 +678,13 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); -handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) +handle_method0(#'connection.close'{}, + State = #v1{connection_state = CS, + sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -805,8 +807,8 @@ map_exception(Channel, Reason) -> ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + none -> {0, 0}; + _ -> rabbit_framing:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3705522d8a..d3813bc7f3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -953,11 +953,7 @@ test_memory_pressure() -> ok = test_memory_pressure_receive_flow(true), %% if we publish at this point, the channel should die - Content = #content{class_id = element(1, rabbit_framing:method_id( - 'basic.publish')), - properties = none, - properties_bin = <<>>, - payload_fragments_rev = []}, + Content = rabbit_basic:build_content([], <<>>), ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), expect_normal_channel_termination(MRef0, Ch0), |
