summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-07-05 13:43:56 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-07-05 13:43:56 +0100
commit954568f0704c96856c10605b3bb3f68f884ea604 (patch)
tree4894a9155434dc23f410bbb7b23ebea24fd56258
parent807937bc528ef7f449cfce310a9ee75988daa5a8 (diff)
parentcf86b976d4b0be38c973352e1130ca993b7b01d5 (diff)
downloadrabbitmq-server-git-954568f0704c96856c10605b3bb3f68f884ea604.tar.gz
merge default into bug21954
-rw-r--r--Makefile11
-rw-r--r--src/rabbit_amqqueue.erl45
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl75
-rw-r--r--src/rabbit_framing_channel.erl34
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_tests.erl6
7 files changed, 96 insertions, 90 deletions
diff --git a/Makefile b/Makefile
index 962aa34761..6edd7abcdb 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,7 @@ INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
-WEB_URL=http://stage.rabbitmq.com/
+WEB_URL=http://www.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml
@@ -202,7 +202,7 @@ srcdist: distclean
>> $(TARGET_SRC_DIR)/INSTALL
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
- >> $(TARGET_SRC_DIR)/BUILD
+ >> $(TARGET_SRC_DIR)/README
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
@@ -223,9 +223,10 @@ distclean: clean
# xmlto can not read from standard input, so we mess with a tmp file.
%.gz: %.xml $(DOCS_DIR)/examples-to-end.xsl
- xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
- xmlto man -o $(DOCS_DIR) --stringparam man.indent.verbatims=0 $<.tmp && \
- gzip -f $(DOCS_DIR)/`basename $< .xml`
+ xmlto --version | grep -E '^xmlto version 0\.0\.([0-9]|1[1-8])$$' >/dev/null || opt='--stringparam man.indent.verbatims=0' ; \
+ xsltproc $(DOCS_DIR)/examples-to-end.xsl $< > $<.tmp && \
+ xmlto man -o $(DOCS_DIR) $$opt $<.tmp && \
+ gzip -f $(DOCS_DIR)/`basename $< .xml`
rm -f $<.tmp
# Use tmp files rather than a pipeline so that we get meaningful errors
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c4ba9344c6..ffa46642d3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,8 +37,9 @@
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2,
- stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
+-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
+ stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -69,7 +70,6 @@
arguments :: rabbit_framing:amqp_table(),
pid :: rabbit:maybe(pid())}).
--type(qstats() :: {'ok', name(), non_neg_integer(), non_neg_integer()}).
-type(qlen() :: {'ok', non_neg_integer()}).
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit:message()}).
@@ -83,6 +83,11 @@
-spec(lookup/1 :: (name()) -> {'ok', amqqueue()} | rabbit_misc:not_found()).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_misc:not_found()).
-spec(with_or_die/2 :: (name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit:maybe(pid))
+ -> ok).
+-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (rabbit:vhost()) -> [amqqueue()]).
-spec(info_keys/0 :: () -> [rabbit:info_key()]).
-spec(info/1 :: (amqqueue()) -> [rabbit:info()]).
@@ -93,8 +98,8 @@
-spec(consumers/1 :: (amqqueue()) -> [{pid(), rabbit:ctag(), boolean()}]).
-spec(consumers_all/1 :: (rabbit:vhost())
-> [{name(), pid(), rabbit:ctag(), boolean()}]).
--spec(stat/1 :: (amqqueue()) -> qstats()).
--spec(stat_all/0 :: () -> [qstats()]).
+-spec(stat/1 ::
+ (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(delete/3 ::
(amqqueue(), 'false', 'false') -> qlen();
(amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
@@ -228,6 +233,31 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, _Args, Owner) ->
+ check_exclusive_access(Q, Owner, strict);
+assert_equivalence(#amqqueue{name = QueueName},
+ _Durable, _AutoDelete, _Args, _Owner) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -262,9 +292,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
-
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
@@ -410,7 +437,7 @@ delegate_call(Pid, Msg, Timeout) ->
delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
delegate_pcall(Pid, Pri, Msg, Timeout) ->
- delegate:invoke(Pid,
+ delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
delegate_pcast(Pid, Pri, Msg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 70e6e75584..3bf48b4cf8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -692,11 +692,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end
end;
-handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
- backing_queue = BQ,
+handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, Name, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a446bbab01..915a41ee15 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -329,19 +329,6 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
- ok;
-check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
- ok;
-check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
-
-with_exclusive_access_or_die(QName, ReaderPid, F) ->
- rabbit_amqqueue:with_or_die(
- QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
-
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -444,12 +431,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
- routed ->
- ok;
- unroutable ->
- ok = basic_return(Message, WriterPid, no_route);
- not_delivered ->
- ok = basic_return(Message, WriterPid, no_consumers)
+ routed -> ok;
+ unroutable -> ok = basic_return(Message, WriterPid, no_route);
+ not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
{noreply, case TxnKey of
none -> State;
@@ -480,7 +464,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
@@ -499,7 +483,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
Content),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
- {reply, #'basic.get_empty'{cluster_id = <<>>}, State}
+ {reply, #'basic.get_empty'{}, State}
end;
handle_method(#'basic.consume'{queue = QueueNameBin,
@@ -524,7 +508,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% We get the queue process to send the consume_ok on our
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
@@ -710,13 +694,13 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(Declare = #'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = Durable,
- exclusive = ExclusiveDeclare,
- auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args},
+handle_method(#'queue.declare'{queue = QueueNameBin,
+ passive = false,
+ durable = Durable,
+ exclusive = ExclusiveDeclare,
+ auto_delete = AutoDelete,
+ nowait = NoWait,
+ arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
reader_pid = ReaderPid,
queue_collector_pid = CollectorPid}) ->
@@ -730,18 +714,15 @@ handle_method(Declare = #'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(QueueName,
- fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
- {{ok, QueueName, MessageCount, ConsumerCount},
- #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q}
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ rabbit_amqqueue:stat(Q)
+ end) of
+ {ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
- {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} ->
- rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)]);
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
@@ -752,13 +733,13 @@ handle_method(Declare = #'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
- handle_method(Declare, undefined, State)
+ handle_method(Declare, none, State)
end
end;
@@ -769,10 +750,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
reader_pid = ReaderPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
- {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
+ {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- check_exclusive_access(Q, ReaderPid, lax),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -783,7 +764,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
@@ -819,7 +800,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
@@ -927,7 +908,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
- fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
+ fun (_X, Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index b7c6aa96fa..bc1a2a0835 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -76,29 +76,27 @@ mainloop(ChannelPid) ->
{method, MethodName, FieldsBin} = read_frame(ChannelPid),
Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
case rabbit_framing:method_has_content(MethodName) of
- true -> rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, MethodName));
+ true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid, ClassId));
false -> rabbit_channel:do(ChannelPid, Method)
end,
?MODULE:mainloop(ChannelPid).
-collect_content(ChannelPid, MethodName) ->
- {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
+collect_content(ChannelPid, ClassId) ->
case read_frame(ChannelPid) of
- {content_header, HeaderClassId, 0, BodySize, PropertiesBin} ->
- if HeaderClassId == ClassId ->
- Payload = collect_content_payload(ChannelPid, BodySize, []),
- #content{class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- payload_fragments_rev = Payload};
- true ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId])
- end;
+ {content_header, ClassId, 0, BodySize, PropertiesBin} ->
+ Payload = collect_content_payload(ChannelPid, BodySize, []),
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ payload_fragments_rev = Payload};
+ {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
rabbit_misc:protocol_error(
command_invalid,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a83adcd45e..81798c5ace 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -678,11 +678,13 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
-handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+handle_method0(#'connection.close'{},
+ State = #v1{connection_state = CS,
+ sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -805,8 +807,8 @@ map_exception(Channel, Reason) ->
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
- none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ none -> {0, 0};
+ _ -> rabbit_framing:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3705522d8a..d3813bc7f3 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -953,11 +953,7 @@ test_memory_pressure() ->
ok = test_memory_pressure_receive_flow(true),
%% if we publish at this point, the channel should die
- Content = #content{class_id = element(1, rabbit_framing:method_id(
- 'basic.publish')),
- properties = none,
- properties_bin = <<>>,
- payload_fragments_rev = []},
+ Content = rabbit_basic:build_content([], <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
expect_normal_channel_termination(MRef0, Ch0),