diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-14 19:02:39 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-06-14 19:02:39 +0100 |
| commit | 36e971a26173b2436f435ce91cf68a642dcaee8c (patch) | |
| tree | ad71d39099ef2240f955ffdcf5f0613a50a996c7 /src | |
| parent | e57e0c2b07a0be43bddbbcbd524ca3ad3ca95373 (diff) | |
| parent | 79f8803b480cab6974c6f212e3644a8f0893596b (diff) | |
| download | rabbitmq-server-git-36e971a26173b2436f435ce91cf68a642dcaee8c.tar.gz | |
Merging default into bug 21673
Diffstat (limited to 'src')
| -rw-r--r-- | src/delegate.erl | 10 | ||||
| -rw-r--r-- | src/gen_server2.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 235 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_headers.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_registry.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_topic.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader_queue_collector.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 147 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 10 | ||||
| -rw-r--r-- | src/supervisor2.erl | 8 |
26 files changed, 450 insertions, 215 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 98353453ff..8af2812781 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -45,8 +45,8 @@ -ifdef(use_specs). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}). --spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). -spec(process_count/0 :: () -> non_neg_integer()). @@ -73,7 +73,7 @@ invoke(Pid, Fun) when is_pid(Pid) -> invoke(Pids, Fun) when is_list(Pids) -> lists:foldl( - fun({Status, Result, Pid}, {Good, Bad}) -> + fun ({Status, Result, Pid}, {Good, Bad}) -> case Status of ok -> {[{Pid, Result}|Good], Bad}; error -> {Good, [{Pid, Result}|Bad]} @@ -136,10 +136,10 @@ delegate_per_remote_node(NodePids, Fun, DelegateFun) -> %% block forever. [gen_server2:cast( local_server(Node), - {thunk, fun() -> + {thunk, fun () -> Self ! {result, DelegateFun( - Node, fun() -> safe_invoke(Pids, Fun) end)} + Node, fun () -> safe_invoke(Pids, Fun) end)} end}) || {Node, Pids} <- NodePids], [receive {result, Result} -> Result end || _ <- NodePids]. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 5b899cdbc7..547f0a42e2 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -639,7 +639,7 @@ do_multi_call(Nodes, Name, Req, Timeout) -> Caller = self(), Receiver = spawn( - fun() -> + fun () -> %% Middleman process. Should be unsensitive to regular %% exit signals. The sychronization is needed in case %% the receiver would exit before the caller started diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 7e96d9a3a8..53c713e66b 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -47,7 +47,7 @@ -type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). +-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). -endif. @@ -67,9 +67,9 @@ stop() -> ok = alarm_handler:delete_alarm_handler(?MODULE). register(Pid, HighMemMFA) -> - ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}, - infinity). + gen_event:call(alarm_handler, ?MODULE, + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- @@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA}, false -> ok end, NewAlertees = dict:store(Pid, HighMemMFA, Alertess), - {ok, ok, State#alarms{alertees = NewAlertees}}; + {ok, State#alarms.vm_memory_high_watermark, + State#alarms{alertees = NewAlertees}}; handle_call(_Request, State) -> {ok, not_understood, State}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 483b5a93b7..3c9c41bdeb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -195,7 +195,8 @@ start_queue_process(Q) -> add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), RoutingKey = QueueName#resource.name, - rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []), + rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [], + fun (_X, _Q) -> ok end), ok. lookup(Name) -> @@ -267,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). @@ -391,15 +392,13 @@ safe_delegate_call_ok(H, F, Pids) -> end. delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end). + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_pcall(Pid, Pri, Msg, Timeout) -> - delegate:invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). - -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + delegate:invoke(Pid, + fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end). delegate_pcast(Pid, Pri, Msg) -> delegate:invoke_no_result(Pid, - fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + fun (P) -> gen_server2:pcast(P, Pri, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3283cb6679..5fdf0ffa90 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(requeue_and_run(AckTags, State)) + end; + handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). @@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid}, handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); -handle_cast({requeue, AckTags, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> - rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", - [ChPid]), - noreply(State); - C = #cr{acktags = ChAckTags} -> - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) - end; - handle_cast({unblock, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2dba00ad62..432d62900b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 27a1275a31..81cf3ceec7 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -57,7 +57,7 @@ -type(frame() :: [binary()]). -spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method()) -> frame()). + (channel_number(), amqp_method_record()) -> frame()). -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 31bb54c0f4..1ab34f8653 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,6 +39,8 @@ -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([flow_timeout/2]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -46,9 +48,12 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid}). + consumer_mapping, blocking, queue_collector_pid, flow}). + +-record(flow, {server, client, pending}). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds -define(INFO_KEYS, [pid, @@ -66,15 +71,18 @@ -ifdef(use_specs). +-type(ref() :: any()). + -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). --spec(do/2 :: (pid(), amqp_method()) -> 'ok'). --spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +flow_timeout(Pid, Ref) -> + gen_server2:pcast(Pid, 7, {flow_timeout, Ref}). + list() -> pg_local:get_members(rabbit_channels). @@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> most_recently_declared_queue = <<>>, consumer_mapping = dict:new(), blocking = dict:new(), - queue_collector_pid = CollectorPid}, + queue_collector_pid = CollectorPid, + flow = #flow{server = true, client = true, + pending = none}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:Reason = #amqp_error{} -> - ok = rollback_and_notify(State), MethodName = rabbit_misc:method_record_type(Method), - State#ch.reader_pid ! {channel_exit, State#ch.channel, - Reason#amqp_error{method = MethodName}}, - {stop, normal, State#ch{state = terminating}}; + {stop, normal, terminating(Reason#amqp_error{method = MethodName}, + State)}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_cast({conserve_memory, Conserve}, State) -> - ok = clear_permission_cache(), - ok = rabbit_writer:send_command( - State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - noreply(State). +handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> + noreply(State); +handle_cast({conserve_memory, false}, State = #ch{state = starting}) -> + ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}), + noreply(State#ch{state = running}); +handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) -> + flow_control(not Conserve, State); +handle_cast({conserve_memory, _Conserve}, State) -> + noreply(State); + +handle_cast({flow_timeout, Ref}, + State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) -> + {stop, normal, terminating( + rabbit_misc:amqp_error( + precondition_failed, + "timeout waiting for channel.flow_ok{active=~w}", + [not Flow], none), State)}; +handle_cast({flow_timeout, _Ref}, State) -> + {noreply, State}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. +terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> + ok = rollback_and_notify(State), + Reader ! {channel_exit, Channel, Reason}, + State#ch{state = terminating}. + return_queue_declare_ok(State, NoWait, Q) -> NewState = State#ch{most_recently_declared_queue = (Q#amqqueue.name)#resource.name}, @@ -299,25 +329,22 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). + with_exclusive_access_or_die(QName, ReaderPid, F) -> - case rabbit_amqqueue:with_or_die( - QName, fun (Q = #amqqueue{exclusive_owner = Owner}) - when Owner =:= none orelse Owner =:= ReaderPid -> - F(Q); - (_) -> - {error, wrong_exclusive_owner} - end) of - {error, wrong_exclusive_owner} -> - rabbit_misc:protocol_error( - resource_locked, "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QName)]); - Other -> - Other - end. + rabbit_amqqueue:with_or_die( + QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); @@ -327,7 +354,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; @@ -369,8 +396,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of + true -> {noreply, State}; + false -> {reply, #'channel.open_ok'{}, State#ch{state = running}} + end; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -387,13 +416,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{exchange = ExchangeNameBin, +handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) -> + rabbit_misc:protocol_error( + command_invalid, + "basic.publish received after channel.flow_ok{active=false}", []); +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -430,13 +463,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> - if DeliveryTag >= NextDeliveryTag -> - rabbit_misc:protocol_error( - command_invalid, "unknown delivery tag ~w", [DeliveryTag]); - true -> ok - end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of @@ -453,11 +480,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ writer_pid = WriterPid, + reader_pid = ReaderPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case rabbit_amqqueue:with_or_die( - QueueName, + case with_exclusive_access_or_die( + QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, @@ -497,9 +525,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Other -> Other end, - %% In order to ensure that the consume_ok gets sent before - %% any messages are sent to the consumer, we get the queue - %% process to send the consume_ok on our behalf. + %% We get the queue process to send the consume_ok on our + %% behalf. This is for symmetry with basic.cancel - see + %% the comment in that method for why. case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> @@ -653,18 +681,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, AutoDelete, Args) end, - ok = rabbit_exchange:assert_type(X, CheckedType), + ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, + AutoDelete, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, - type = TypeNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), - X = rabbit_exchange:lookup_or_die(ExchangeName), - ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), + _ = rabbit_exchange:lookup_or_die(ExchangeName), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{exchange = ExchangeNameBin, @@ -699,25 +726,28 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. - Finish = - fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q) - when Owner =:= Owner1 -> - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", - [rabbit_misc:rs(QueueName)]) - end, + Finish = fun (#amqqueue{name = QueueName, + durable = Durable1, + auto_delete = AutoDelete1} = Q) + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q, Owner, strict), + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + end, + Q; + %% non-equivalence trumps exclusivity arbitrarily + (#amqqueue{name = QueueName}) -> + rabbit_misc:protocol_error( + precondition_failed, + "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) + end, Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), Finish) of @@ -771,7 +801,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin, routing_key = RoutingKey, nowait = NoWait, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{}, NoWait, State); @@ -779,7 +809,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, arguments = Arguments}, _, State) -> - binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin, + binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{}, false, State); @@ -802,14 +832,14 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State) -> {reply, #'tx.commit_ok'{}, internal_commit(State)}; handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; @@ -822,7 +852,6 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; - handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -840,11 +869,25 @@ handle_method(#'channel.flow'{active = false}, _, blocking = dict:from_list(Queues)}} end; -handle_method(#'channel.flow_ok'{active = _}, _, State) -> - %% TODO: We may want to correlate this to channel.flow messages we - %% have sent, and complain if we get an unsolicited - %% channel.flow_ok, or the client refuses our flow request. - {noreply, State}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Active, client = Flow, + pending = {_Ref, TRef}} = F}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, State#ch{flow = F#flow{client = Active, pending = none}}}; +handle_method(#'channel.flow_ok'{active = Active}, _, + State = #ch{flow = #flow{server = Flow, client = Flow, + pending = {_Ref, TRef}}}) + when Flow =:= not Active -> + {ok, cancel} = timer:cancel(TRef), + {noreply, issue_flow(Flow, State)}; +handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) -> + rabbit_misc:protocol_error( + command_invalid, "unsolicited channel.flow_ok", []); +handle_method(#'channel.flow_ok'{active = Active}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, + "received channel.flow_ok{active=~w} has incorrect polarity", [Active]); handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -852,8 +895,26 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}}) + when Flow =:= not Active -> + ok = clear_permission_cache(), + noreply(issue_flow(Active, State)); +flow_control(Active, State = #ch{flow = F}) -> + noreply(State#ch{flow = F#flow{server = Active}}). + +issue_flow(Active, State) -> + ok = rabbit_writer:send_command( + State#ch.writer_pid, #'channel.flow'{active = Active}), + Ref = make_ref(), + {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout, + [self(), Ref]), + State#ch{flow = #flow{server = Active, client = not Active, + pending = {Ref, TRef}}}. + binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, - ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) -> + ReturnMethod, NoWait, + State = #ch{virtual_host = VHostPath, + reader_pid = ReaderPid}) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -864,7 +925,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), - case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, + fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> @@ -878,10 +940,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, ReturnMethod) end. @@ -916,7 +974,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - {ToAcc, PrefixAcc} + rabbit_misc:protocol_error( + not_found, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index d1834b3b73..323d4d2fd1 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -59,8 +59,8 @@ start() -> parse_args(FullCommand, #params{quiet = false, node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of - true -> fun(_Format, _Args1) -> ok end; - false -> fun(Format, Args1) -> + true -> fun (_Format, _Args1) -> ok end; + false -> fun (Format, Args1) -> io:format(Format ++ " ...~n", Args1) end end, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 8f41392f83..c5149b08b9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,10 +36,12 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2]). --export([add_binding/4, delete_binding/4, list_bindings/1]). +-export([add_binding/5, delete_binding/5, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2]). +-export([assert_equivalence/5]). +-export([assert_args_equivalence/2]). +-export([check_type/1]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -58,11 +60,15 @@ 'queue_not_found' | 'exchange_not_found' | 'exchange_and_queue_not_found'}). +-type(inner_fun() :: fun((exchange(), queue()) -> any())). + -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). --spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). +-spec(assert_equivalence/5 :: (exchange(), atom(), boolean(), boolean(), + amqp_table()) -> 'ok'). +-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). @@ -72,16 +78,17 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). --spec(add_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). --spec(delete_binding/4 :: - (exchange_name(), queue_name(), routing_key(), amqp_table()) -> +-spec(add_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> + bind_res()). +-spec(delete_binding/5 :: + (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun (() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> + fun (() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -97,12 +104,12 @@ recover() -> Exs = rabbit_misc:table_fold( - fun(Exchange, Acc) -> + fun (Exchange, Acc) -> ok = mnesia:write(rabbit_exchange, Exchange, write), [Exchange | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_misc:table_fold( - fun(Route = #route{binding = B}, Acc) -> + fun (Route = #route{binding = B}, Acc) -> {_, ReverseRoute} = route_with_reverse(Route), ok = mnesia:write(rabbit_route, Route, write), @@ -182,13 +189,36 @@ check_type(TypeBin) -> T end. -assert_type(#exchange{ type = ActualType }, RequiredType) - when ActualType == RequiredType -> - ok; -assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> +assert_equivalence(X = #exchange{ durable = Durable, + auto_delete = AutoDelete, + type = Type}, + Type, Durable, AutoDelete, + RequiredArgs) -> + ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); +assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, + _Args) -> rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), ActualType, RequiredType]). + precondition_failed, + "cannot redeclare ~s with different type, durable or autodelete value", + [rabbit_misc:rs(Name)]). + +alternate_exchange_value(Args) -> + lists:keysearch(<<"alternate-exchange">>, 1, Args). + +assert_args_equivalence(#exchange{ name = Name, + arguments = Args }, + RequiredArgs) -> + %% The spec says "Arguments are compared for semantic + %% equivalence". The only arg we care about is + %% "alternate-exchange". + Ae1 = alternate_exchange_value(RequiredArgs), + Ae2 = alternate_exchange_value(Args), + if Ae1==Ae2 -> ok; + true -> rabbit_misc:protocol_error( + precondition_failed, + "cannot redeclare ~s with inequivalent args", + [rabbit_misc:rs(Name)]) + end. lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). @@ -349,7 +379,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, not_found}; [X] -> Fun(X) end @@ -357,7 +387,7 @@ call_with_exchange(Exchange, Fun) -> call_with_exchange_and_queue(Exchange, Queue, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun() -> case {mnesia:read({rabbit_exchange, Exchange}), + fun () -> case {mnesia:read({rabbit_exchange, Exchange}), mnesia:read({rabbit_queue, Queue})} of {[X], [Q]} -> Fun(X, Q); {[ ], [_]} -> {error, exchange_not_found}; @@ -366,21 +396,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end end). -add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - case mnesia:read({rabbit_route, B}) of - [] -> - sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - {new, X, B}; - [_R] -> - {existing, X, B} - end + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + InnerFun(X, Q), + case mnesia:read({rabbit_route, B}) of + [] -> + sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> @@ -391,14 +423,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> Err end. -delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> +delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> case binding_action( ExchangeName, QueueName, RoutingKey, Arguments, fun (X, Q, B) -> case mnesia:match_object(rabbit_route, #route{binding = B}, write) of [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, + _ -> InnerFun(X, Q), + ok = sync_binding(B, Q#amqqueue.durable, fun mnesia:delete_object/3), {maybe_auto_delete(X), B} end diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index a8c071e681..85760edce4 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -54,7 +54,11 @@ behaviour_info(callbacks) -> {add_binding, 2}, %% called after bindings have been deleted. - {remove_bindings, 2} + {remove_bindings, 2}, + + %% called when comparing exchanges for equivalence - should return ok or + %% exit with #amqp_error{} + {assert_args_equivalence, 2} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 9b71e0e1d1..4f6eb85199 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 311654ab21..4f9712b14e 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 285dab1a03..315e800021 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -37,7 +37,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl index 175d15ad83..33ea0e926b 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_exchange_type_registry.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 8a3dceeaeb..0e22d5458e 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -18,11 +18,11 @@ %% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial %% Technologies LLC, and Rabbit Technologies Ltd. %% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift %% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% Copyright (C) 2007-2010 Cohesive Financial Technologies %% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. +%% (C) 2007-2010 Rabbit Technologies Ltd. %% %% All Rights Reserved. %% @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9a911ab15d..35739dcbdf 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -242,12 +242,12 @@ report_cover([Root]) when is_atom(Root) -> report_cover(Root) -> Dir = filename:join(Root, "cover"), ok = filelib:ensure_dir(filename:join(Dir,"junk")), - lists:foreach(fun(F) -> file:delete(F) end, + lists:foreach(fun (F) -> file:delete(F) end, filelib:wildcard(filename:join(Dir, "*.html"))), {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( - fun(M,{CovTot, NotCovTot}) -> + fun (M,{CovTot, NotCovTot}) -> {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), ok = report_coverage_percentage(SummaryFile, Cov, NotCov, M), @@ -367,7 +367,7 @@ upmap(F, L) -> Parent = self(), Ref = make_ref(), [receive {Ref, Result} -> Result end - || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]]. + || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]]. map_in_order(F, L) -> lists:reverse( diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 55a6761d2d..a0b7aa4e7f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -346,7 +346,7 @@ table_has_copy_type(TabDef, DiscType) -> create_local_table_copies(Type) -> lists:foreach( - fun({Tab, TabDef}) -> + fun ({Tab, TabDef}) -> HasDiscCopies = table_has_copy_type(TabDef, disc_copies), HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), LocalTab = proplists:get_bool(local_content, TabDef), diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 336f74bf9a..5db1d77a32 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -111,7 +111,7 @@ action(start_all, [NodeCount], RpcTimeout) -> action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( - fun({Node, Pid}) -> + fun ({Node, Pid}) -> RabbitRunning = case is_rabbit_running(Node, RpcTimeout) of false -> not_running; @@ -123,7 +123,7 @@ action(status, [], RpcTimeout) -> action(stop_all, [], RpcTimeout) -> io:format("Stopping all nodes...~n", []), - call_all_nodes(fun({Node, Pid}) -> + call_all_nodes(fun ({Node, Pid}) -> io:format("Stopping node ~p~n", [Node]), rpc:call(Node, rabbit, stop_and_halt, []), case kill_wait(Pid, RpcTimeout, false) of diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 406977b42a..975954fcd2 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -66,7 +66,7 @@ async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> Pid = self(), Ref = make_ref(), - spawn(fun() -> Pid ! {inet_async, Sock, Ref, + spawn(fun () -> Pid ! {inet_async, Sock, Ref, ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} end), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 3cd42e4753..8d3c2dc082 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -236,7 +236,7 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun(M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of [_] -> {tied, QK}; [] -> ets:insert(Messages, {PKey, Message}), diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl index 841549e98f..8d4e8fdb42 100644 --- a/src/rabbit_reader_queue_collector.erl +++ b/src/rabbit_reader_queue_collector.erl @@ -82,8 +82,8 @@ handle_call({register_exclusive_queue, Q}, _From, handle_call(delete_all, _From, State = #state{exclusive_queues = ExclusiveQueues}) -> [rabbit_misc:with_exit_handler( - fun() -> ok end, - fun() -> + fun () -> ok end, + fun () -> erlang:demonitor(MonitorRef), rabbit_amqqueue:delete(Q, false, false) end) diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 03979d6c60..5cd15a9462 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -57,14 +57,17 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. delegate:invoke_no_result( - QPids, fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), + QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; deliver(QPids, Delivery) -> {Success, _} = delegate:invoke(QPids, - fun(Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), - {Routed, Handled} = lists:foldl(fun fold_deliveries/2, {false, []}, Success), + fun (Pid) -> + rabbit_amqqueue:deliver(Pid, Delivery) + end), + {Routed, Handled} = + lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). @@ -88,7 +91,7 @@ match_routing_key(Name, RoutingKey) -> lookup_qpids(Queues) -> sets:fold( - fun(Key, Acc) -> + fun (Key, Acc) -> case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 04ee96d8a1..7ce03e5d6d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -65,6 +65,7 @@ all_tests() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), + passed = test_memory_pressure(), passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), @@ -985,7 +986,7 @@ test_hooks() -> {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), %% Invoking Pids - Remote = fun() -> + Remote = fun () -> receive {rabbitmq_hook,[remote_test,test,[],Target]} -> Target ! invoked @@ -1002,11 +1003,137 @@ test_hooks() -> end, passed. +test_memory_pressure_receiver(Pid) -> + receive + shutdown -> + ok; + {send_command, Method} -> + ok = case Method of + #'channel.flow'{} -> ok; + #'basic.qos_ok'{} -> ok; + #'channel.open_ok'{} -> ok + end, + Pid ! Method, + test_memory_pressure_receiver(Pid); + sync -> + Pid ! sync, + test_memory_pressure_receiver(Pid) + end. + +test_memory_pressure_receive_flow(Active) -> + receive #'channel.flow'{active = Active} -> ok + after 1000 -> throw(failed_to_receive_channel_flow) + end, + receive #'channel.flow'{} -> + throw(pipelining_sync_commands_detected) + after 0 -> + ok + end. + +test_memory_pressure_sync(Ch, Writer) -> + ok = rabbit_channel:do(Ch, #'basic.qos'{}), + Writer ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'basic.qos_ok'{} -> ok + after 1000 -> throw(failed_to_receive_basic_qos_ok) + end. + +test_memory_pressure_spawn() -> + Me = self(), + Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + MRef = erlang:monitor(process, Ch), + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + {Writer, Ch, MRef}. + +expect_normal_channel_termination(MRef, Ch) -> + receive {'DOWN', MRef, process, Ch, normal} -> ok + after 1000 -> throw(channel_failed_to_exit) + end. + +test_memory_pressure() -> + {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(), + [ok = rabbit_channel:conserve_memory(Ch0, Conserve) || + Conserve <- [false, false, true, false, true, true, false]], + ok = test_memory_pressure_sync(Ch0, Writer0), + receive {'DOWN', MRef0, process, Ch0, Info0} -> + throw({channel_died_early, Info0}) + after 0 -> ok + end, + + %% we should have just 1 active=false waiting for us + ok = test_memory_pressure_receive_flow(false), + + %% if we reply with flow_ok, we should immediately get an + %% active=true back + ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_receive_flow(true), + + %% if we publish at this point, the channel should die + Content = #content{class_id = element(1, rabbit_framing:method_id( + 'basic.publish')), + properties = none, + properties_bin = <<>>, + payload_fragments_rev = []}, + ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content), + expect_normal_channel_termination(MRef0, Ch0), + + {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch1, true), + ok = test_memory_pressure_receive_flow(false), + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + ok = test_memory_pressure_sync(Ch1, Writer1), + ok = rabbit_channel:conserve_memory(Ch1, false), + ok = test_memory_pressure_receive_flow(true), + %% send back the wrong flow_ok. Channel should die. + ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}), + expect_normal_channel_termination(MRef1, Ch1), + + {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(), + %% just out of the blue, send a flow_ok. Life should end. + ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}), + expect_normal_channel_termination(MRef2, Ch2), + + {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(), + ok = rabbit_channel:conserve_memory(Ch3, true), + receive {'DOWN', MRef3, process, Ch3, _} -> + ok + after 12000 -> + throw(channel_failed_to_exit) + end, + + alarm_handler:set_alarm({vm_memory_high_watermark, []}), + Me = self(), + Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), + Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, + self()), + ok = rabbit_channel:do(Ch4, #'channel.open'{}), + MRef4 = erlang:monitor(process, Ch4), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) + after 0 -> ok + end, + alarm_handler:clear_alarm(vm_memory_high_watermark), + Writer4 ! sync, + receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, + receive #'channel.open_ok'{} -> ok + after 1000 -> throw(failed_to_receive_channel_open_ok) + end, + rabbit_channel:shutdown(Ch4), + expect_normal_channel_termination(MRef4, Ch4), + + passed. + test_delegates_async(SecondaryNode) -> Self = self(), - Sender = fun(Pid) -> Pid ! {invoked, Self} end, + Sender = fun (Pid) -> Pid ! {invoked, Self} end, - Responder = make_responder(fun({invoked, Pid}) -> Pid ! response end), + Responder = make_responder(fun ({invoked, Pid}) -> Pid ! response end), ok = delegate:invoke_no_result(spawn(Responder), Sender), ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), @@ -1021,7 +1148,7 @@ test_delegates_async(SecondaryNode) -> make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> - fun() -> + fun () -> receive Msg -> FMsg(Msg) after 1000 -> throw(Throw) end @@ -1050,22 +1177,22 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun(Pid) -> gen_server:call(Pid, invoked) end, - BadSender = fun(_Pid) -> exit(exception) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + BadSender = fun (_Pid) -> exit(exception) end, - Responder = make_responder(fun({'$gen_call', From, invoked}) -> + Responder = make_responder(fun ({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end), - BadResponder = make_responder(fun({'$gen_call', From, invoked}) -> + BadResponder = make_responder(fun ({'$gen_call', From, invoked}) -> gen_server:reply(From, response) end, bad_responder_died), response = delegate:invoke(spawn(Responder), Sender), response = delegate:invoke(spawn(SecondaryNode, Responder), Sender), - must_exit(fun() -> delegate:invoke(spawn(BadResponder), BadSender) end), - must_exit(fun() -> + must_exit(fun () -> delegate:invoke(spawn(BadResponder), BadSender) end), + must_exit(fun () -> delegate:invoke(spawn(SecondaryNode, BadResponder), BadSender) end), LocalGoodPids = spawn_responders(node(), Responder, 2), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 54c60f5be0..3d10dc121e 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,17 +50,17 @@ -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). -spec(send_command_and_signal_back/4 :: (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). + (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). -spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method()) -> 'ok'). + (socket(), channel_number(), amqp_method_record()) -> 'ok'). -spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method(), + (socket(), channel_number(), amqp_method_record(), content(), non_neg_integer()) -> 'ok'). -endif. diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 5575351269..0b1d726562 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -301,13 +301,13 @@ handle_call({terminate_child, Name}, _From, State) -> handle_call(which_children, _From, State) when ?is_simple(State) -> [#child{child_type = CT, modules = Mods}] = State#state.children, - Reply = lists:map(fun({Pid, _}) -> {undefined, Pid, CT, Mods} end, + Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, ?DICT:to_list(State#state.dynamics)), {reply, Reply, State}; handle_call(which_children, _From, State) -> Resp = - lists:map(fun(#child{pid = Pid, name = Name, + lists:map(fun (#child{pid = Pid, name = Name, child_type = ChildType, modules = Mods}) -> {Name, Pid, ChildType, Mods} end, @@ -415,7 +415,7 @@ update_childspec1([], Children, KeepOld) -> lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> - case lists:map(fun(Ch) when OldCh#child.name =:= Ch#child.name -> + case lists:map(fun (Ch) when OldCh#child.name =:= Ch#child.name -> Ch#child{pid = OldCh#child.pid}; (Ch) -> Ch @@ -828,7 +828,7 @@ validShutdown(Shutdown, _) -> throw({invalid_shutdown, Shutdown}). validMods(dynamic) -> true; validMods(Mods) when is_list(Mods) -> - lists:foreach(fun(Mod) -> + lists:foreach(fun (Mod) -> if is_atom(Mod) -> ok; true -> throw({invalid_module, Mod}) |
