diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2010-06-15 15:01:11 +0100 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2010-06-15 15:01:11 +0100 |
| commit | 2adbfa30d8f58d9c66f0bb3259af775f1dd6d353 (patch) | |
| tree | d98048f26096100ac1f5a3475a6e5a53f4fa67b8 /src | |
| parent | 916e35763a4c5e0149bdb1987c220b2f3201c7ce (diff) | |
| parent | 79f8803b480cab6974c6f212e3644a8f0893596b (diff) | |
| download | rabbitmq-server-git-2adbfa30d8f58d9c66f0bb3259af775f1dd6d353.tar.gz | |
Merged default into amqp_0_9_1
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 10 |
10 files changed, 65 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e7c926643f..5fdf0ffa90 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,16 +716,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({requeue, AckTags, ChPid}, _From, State) -> +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - reply(ok, State); + noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), store_ch_record(C#cr{acktags = ChAckTags1}), - reply(ok, requeue_and_run(AckTags, State)) + noreply(requeue_and_run(AckTags, State)) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 27a1275a31..81cf3ceec7 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -57,7 +57,7 @@ -type(frame() :: [binary()]). -spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method()) -> frame()). + (channel_number(), amqp_method_record()) -> frame()). -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58c56cc072..3dfc026b99 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -75,8 +75,8 @@ -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). --spec(do/2 :: (pid(), amqp_method()) -> 'ok'). --spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). @@ -344,7 +344,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) -> expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); @@ -354,7 +354,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; @@ -460,13 +460,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> - if DeliveryTag >= NextDeliveryTag -> - rabbit_misc:protocol_error( - command_invalid, "unknown delivery tag ~w", [DeliveryTag]); - true -> ok - end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of @@ -528,9 +522,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Other -> Other end, - %% In order to ensure that the consume_ok gets sent before - %% any messages are sent to the consumer, we get the queue - %% process to send the consume_ok on our behalf. + %% We get the queue process to send the consume_ok on our + %% behalf. This is for symmetry with basic.cancel - see + %% the comment in that method for why. case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> @@ -843,14 +837,14 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State) -> {reply, #'tx.commit_ok'{}, internal_commit(State)}; handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; @@ -951,10 +945,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, ReturnMethod) end. @@ -989,7 +979,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - {ToAcc, PrefixAcc} + rabbit_misc:protocol_error( + not_found, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 841161dbca..eb6f3e49f9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,8 @@ -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([assert_equivalence/4]). --export([check_type/1, assert_type/2]). +-export([assert_args_equivalence/2]). +-export([check_type/1]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -65,6 +66,7 @@ -spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok'). +-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). @@ -76,7 +78,7 @@ -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/5 :: (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). + bind_res()). -spec(delete_binding/5 :: (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). @@ -184,28 +186,22 @@ check_type(TypeBin) -> T end. -assert_equivalence(X = #exchange{ durable = ActualDurable }, - RequiredType, RequiredDurable, RequiredArgs) - when ActualDurable == RequiredDurable -> - ok = assert_type(X, RequiredType), - ok = assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) -> +assert_equivalence(X = #exchange{ durable = Durable, + type = Type}, + Type, Durable, + RequiredArgs) -> + ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); +assert_equivalence(#exchange{ name = Name }, _Type, _Durable, + _Args) -> rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s with different durable value", + precondition_failed, + "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -assert_type(#exchange{ type = ActualType }, RequiredType) - when ActualType == RequiredType -> - ok; -assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> - rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), ActualType, RequiredType]). - alternate_exchange_value(Args) -> lists:keysearch(<<"alternate-exchange">>, 1, Args). -assert_args_equivalence(#exchange{ name = Name, +assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic @@ -213,9 +209,9 @@ assert_args_equivalence(#exchange{ name = Name, %% "alternate-exchange". Ae1 = alternate_exchange_value(RequiredArgs), Ae2 = alternate_exchange_value(Args), - if Ae1==Ae2 -> ok; - true -> rabbit_misc:protocol_error( - not_allowed, + if Ae1==Ae2 -> ok; + true -> rabbit_misc:protocol_error( + precondition_failed, "cannot redeclare ~s with inequivalent args", [rabbit_misc:rs(Name)]) end. @@ -400,19 +396,17 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to - %% failing on e.g., the durability being different. + %% anything else InnerFun(X, Q), - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - case mnesia:read({rabbit_route, B}) of - [] -> - sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - {new, X, B}; - [_R] -> - {existing, X, B} - end + case mnesia:read({rabbit_route, B}) of + [] -> + sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 699250f797..85760edce4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -54,7 +54,11 @@ behaviour_info(callbacks) -> {add_binding, 2}, %% called after bindings have been deleted. - {remove_bindings, 2} + {remove_bindings, 2}, + + %% called when comparing exchanges for equivalence - should return ok or + %% exit with #amqp_error{} + {assert_args_equivalence, 2} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index c3fb2588a1..4f6eb85199 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 62c862a503..4f9712b14e 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0991bf0d24..315e800021 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -37,7 +37,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e42c451829..0e22d5458e 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 54c60f5be0..3d10dc121e 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,17 +50,17 @@ -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). -spec(send_command_and_signal_back/4 :: (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). + (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). -spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method()) -> 'ok'). + (socket(), channel_number(), amqp_method_record()) -> 'ok'). -spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method(), + (socket(), channel_number(), amqp_method_record(), content(), non_neg_integer()) -> 'ok'). -endif. |
