summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2010-06-15 15:01:11 +0100
committerEmile Joubert <emile@rabbitmq.com>2010-06-15 15:01:11 +0100
commit2adbfa30d8f58d9c66f0bb3259af775f1dd6d353 (patch)
treed98048f26096100ac1f5a3475a6e5a53f4fa67b8 /src
parent916e35763a4c5e0149bdb1987c220b2f3201c7ce (diff)
parent79f8803b480cab6974c6f212e3644a8f0893596b (diff)
downloadrabbitmq-server-git-2adbfa30d8f58d9c66f0bb3259af775f1dd6d353.tar.gz
Merged default into amqp_0_9_1
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_channel.erl31
-rw-r--r--src/rabbit_exchange.erl60
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_writer.erl10
10 files changed, 65 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e7c926643f..5fdf0ffa90 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,16 +716,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({requeue, AckTags, ChPid}, _From, State) ->
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- reply(ok, State);
+ noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
store_ch_record(C#cr{acktags = ChAckTags1}),
- reply(ok, requeue_and_run(AckTags, State))
+ noreply(requeue_and_run(AckTags, State))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 27a1275a31..81cf3ceec7 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -57,7 +57,7 @@
-type(frame() :: [binary()]).
-spec(build_simple_method_frame/2 ::
- (channel_number(), amqp_method()) -> frame()).
+ (channel_number(), amqp_method_record()) -> frame()).
-spec(build_simple_content_frames/3 ::
(channel_number(), content(), non_neg_integer()) -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 58c56cc072..3dfc026b99 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -75,8 +75,8 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
--spec(do/2 :: (pid(), amqp_method()) -> 'ok').
--spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
+-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
@@ -344,7 +344,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
most_recently_declared_queue = MRDQ }) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
@@ -354,7 +354,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = MRDQ }) ->
MRDQ;
@@ -460,13 +460,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
- if DeliveryTag >= NextDeliveryTag ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown delivery tag ~w", [DeliveryTag]);
- true -> ok
- end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
@@ -528,9 +522,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Other -> Other
end,
- %% In order to ensure that the consume_ok gets sent before
- %% any messages are sent to the consumer, we get the queue
- %% process to send the consume_ok on our behalf.
+ %% We get the queue process to send the consume_ok on our
+ %% behalf. This is for symmetry with basic.cancel - see
+ %% the comment in that method for why.
case with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
@@ -843,14 +837,14 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State) ->
{reply, #'tx.commit_ok'{}, internal_commit(State)};
handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
@@ -951,10 +945,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
@@ -989,7 +979,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- {ToAcc, PrefixAcc}
+ rabbit_misc:protocol_error(
+ not_found, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 841161dbca..eb6f3e49f9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,8 @@
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([assert_equivalence/4]).
--export([check_type/1, assert_type/2]).
+-export([assert_args_equivalence/2]).
+-export([check_type/1]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -65,6 +66,7 @@
-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok').
+-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
@@ -76,7 +78,7 @@
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
-spec(add_binding/5 ::
(exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
+ bind_res()).
-spec(delete_binding/5 ::
(exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
@@ -184,28 +186,22 @@ check_type(TypeBin) ->
T
end.
-assert_equivalence(X = #exchange{ durable = ActualDurable },
- RequiredType, RequiredDurable, RequiredArgs)
- when ActualDurable == RequiredDurable ->
- ok = assert_type(X, RequiredType),
- ok = assert_args_equivalence(X, RequiredArgs);
-assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) ->
+assert_equivalence(X = #exchange{ durable = Durable,
+ type = Type},
+ Type, Durable,
+ RequiredArgs) ->
+ ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable,
+ _Args) ->
rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s with different durable value",
+ precondition_failed,
+ "cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_type(#exchange{ type = ActualType }, RequiredType)
- when ActualType == RequiredType ->
- ok;
-assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
- rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
-
alternate_exchange_value(Args) ->
lists:keysearch(<<"alternate-exchange">>, 1, Args).
-assert_args_equivalence(#exchange{ name = Name,
+assert_args_equivalence(#exchange{ name = Name,
arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
@@ -213,9 +209,9 @@ assert_args_equivalence(#exchange{ name = Name,
%% "alternate-exchange".
Ae1 = alternate_exchange_value(RequiredArgs),
Ae2 = alternate_exchange_value(Args),
- if Ae1==Ae2 -> ok;
- true -> rabbit_misc:protocol_error(
- not_allowed,
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ precondition_failed,
"cannot redeclare ~s with inequivalent args",
[rabbit_misc:rs(Name)])
end.
@@ -400,19 +396,17 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
fun (X, Q, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
- %% failing on e.g., the durability being different.
+ %% anything else
InnerFun(X, Q),
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 699250f797..85760edce4 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -54,7 +54,11 @@ behaviour_info(callbacks) ->
{add_binding, 2},
%% called after bindings have been deleted.
- {remove_bindings, 2}
+ {remove_bindings, 2},
+
+ %% called when comparing exchanges for equivalence - should return ok or
+ %% exit with #amqp_error{}
+ {assert_args_equivalence, 2}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c3fb2588a1..4f6eb85199 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 62c862a503..4f9712b14e 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0991bf0d24..315e800021 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -37,7 +37,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e42c451829..0e22d5458e 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 54c60f5be0..3d10dc121e 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -50,17 +50,17 @@
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
-spec(send_command_and_signal_back/4 ::
(pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
+ (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
-spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method()) -> 'ok').
+ (socket(), channel_number(), amqp_method_record()) -> 'ok').
-spec(internal_send_command/5 ::
- (socket(), channel_number(), amqp_method(),
+ (socket(), channel_number(), amqp_method_record(),
content(), non_neg_integer()) -> 'ok').
-endif.