diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 318 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 75 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 80 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 143 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 76 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
16 files changed, 486 insertions, 352 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 0d1ce689ea..e910e24efc 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -136,7 +136,7 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), - {ok, MemoryAlarms} = application:get_env(memory_alarms), + {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), ok = rabbit_binary_generator: @@ -226,10 +226,14 @@ print_banner() -> [Product, Version, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - io:format("Logging to ~p~nSASL logging to ~p~n~n", - [log_location(kernel), log_location(sasl)]). - - + Settings = [{"node", node()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}], + DescrLen = lists:max([length(K) || {K, _V} <- Settings]), + Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", + lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), + io:nl(). start_child(Mod) -> {ok,_} = supervisor:start_child(rabbit_sup, @@ -315,7 +319,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) -> log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, - {cannot_rotate_sasl_logs, SaslLogError}}}; + {cannot_rotate_sasl_logs, SaslLogError}}}; log_rotation_result({error, MainLogError}, ok) -> {error, {cannot_rotate_main_logs, MainLogError}}; log_rotation_result(ok, {error, SaslLogError}) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 54348d9a1c..6ff7a1046c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -45,11 +45,13 @@ -ifdef(use_specs). +-type(permission_atom() :: 'configure' | 'read' | 'write'). + -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). -spec(check_resource_access/3 :: - (username(), r(atom()), non_neg_integer()) -> 'ok'). + (username(), r(atom()), permission_atom()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -137,6 +139,10 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +permission_index(configure) -> #permission.configure; +permission_index(write) -> #permission.write; +permission_index(read) -> #permission.read. + check_resource_access(Username, R = #resource{kind = exchange, name = <<"">>}, Permission) -> @@ -158,7 +164,7 @@ check_resource_access(Username, [#user_permission{permission = P}] -> case regexp:match( binary_to_list(Name), - binary_to_list(element(Permission, P))) of + binary_to_list(element(permission_index(Permission), P))) of {match, _, _} -> true; nomatch -> false end @@ -239,8 +245,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eb076e94d6..198e2782b4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,10 +31,11 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). +-export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). + stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). @@ -84,7 +85,7 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). +-spec(deliver/2 :: (pid(), delivery()) -> bool()). -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). @@ -102,6 +103,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -157,11 +159,17 @@ declare(QueueName, Durable, AutoDelete, Args) -> auto_delete = AutoDelete, arguments = Args, pid = none}), + internal_declare(Q, true). + +internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), - ok = add_default_binding(Q), + case WantDefaultBinding of + true -> add_default_binding(Q); + false -> ok + end, Q; [ExistingQ] -> ExistingQ end @@ -201,9 +209,7 @@ with(Name, F, E) -> with(Name, F) -> with(Name, F, fun () -> {error, not_found} end). with_or_die(Name, F) -> - with(Name, F, fun () -> rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(Name)]) - end). + with(Name, F, fun () -> rabbit_misc:not_found(Name) end). list(VHostPath) -> mnesia:dirty_match_object( @@ -235,13 +241,16 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). -deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); -deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}, infinity), +deliver(QPid, #delivery{immediate = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, + infinity); +deliver(QPid, #delivery{mandatory = true, + txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), true; -deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server2:cast(QPid, {deliver, Txn, Message}), +deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> + gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), true. redeliver(QPid, Messages) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7e4..cf0ef44f5c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,19 +53,21 @@ has_had_consumers, next_msg_id, message_buffer, - round_robin}). + active_consumers, + blocked_consumers}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary --record(cr, {consumers, +-record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, unacked_messages, is_limit_active, + txn, unsent_message_count}). -define(INFO_KEYS, @@ -98,7 +100,8 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}, ?HIBERNATE_AFTER}. + active_consumers = queue:new(), + blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -128,11 +131,12 @@ ch_record(ChPid) -> case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumers = [], + C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), is_limit_active = false, + txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -146,7 +150,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> BlockedOld = is_ch_blocked(OldCR), @@ -156,20 +160,25 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. +record_current_channel_tx(ChPid, Txn) -> + %% as a side effect this also starts monitoring the channel (if + %% that wasn't happening already) + store_ch_record((ch_record(ChPid))#cr{txn = Txn}). + deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - case queue:out(RoundRobin) of + case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, - RoundRobinTail} -> + ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -181,24 +190,38 @@ deliver_immediately(Message, Delivered, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), - NewConsumers = + {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) + ok -> {queue:in(QEntry, ActiveConsumersTail), + BlockedConsumers}; + block -> + {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId + 1}}; + {offered, AckRequired, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), - NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_immediately(Message, Delivered, - State#q{round_robin = NewConsumers}) + {NewActiveConsumers, NewBlockedConsumers} = + move_consumers(ChPid, + ActiveConsumers, + BlockedConsumers), + deliver_immediately( + Message, Delivered, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end; {empty, _} -> {not_offered, State} end. -attempt_delivery(none, Message, State) -> +attempt_delivery(none, _ChPid, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; @@ -209,13 +232,13 @@ attempt_delivery(none, Message, State) -> {not_offered, State1} -> {false, State1} end; -attempt_delivery(Txn, Message, State) -> +attempt_delivery(Txn, ChPid, Message, State) -> persist_message(Txn, qname(State), Message), - record_pending_message(Txn, Message), + record_pending_message(Txn, ChPid, Message), {true, State}. -deliver_or_enqueue(Txn, Message, State) -> - case attempt_delivery(Txn, Message, State) of +deliver_or_enqueue(Txn, ChPid, Message, State) -> + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> @@ -228,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), State). -block_consumers(ChPid, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(RoundRobin))). - -unblock_consumers(ChPid, Consumers, RoundRobin) -> - %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]), - queue:join(RoundRobin, - queue:from_list([{ChPid, Con} || Con <- Consumers])). +add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). -block_consumer(ChPid, ConsumerTag, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]), +remove_consumer(ChPid, ConsumerTag, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 queue:from_list(lists:filter( fun ({CP, #consumer{tag = CT}}) -> (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(RoundRobin))). + end, queue:to_list(Queue))). + +remove_consumers(ChPid, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 + queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(Queue))). + +move_consumers(ChPid, From, To) -> + {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(From)), + {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -254,65 +279,48 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> NewRR = unblock_consumers(ChPid, - NewC#cr.consumers, - State#q.round_robin), - run_poke_burst(State#q{round_robin = NewRR}) + unblock -> {NewBlockedeConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedeConsumers}) end end. -check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> - {continue, State}; -check_auto_delete(State = #q{has_had_consumers = false}) -> - {continue, State}; -check_auto_delete(State = #q{round_robin = RoundRobin}) -> - % The clauses above rule out cases where no-one has consumed from - % this queue yet, and cases where we are not an auto_delete queue - % in any case. Thus it remains to check whether we have any active - % listeners at this point. - case queue:is_empty(RoundRobin) of - true -> - % There are no waiting listeners. It's possible that we're - % completely unused. Check. - case is_unused() of - true -> - % There are no active consumers at this - % point. This is the signal to autodelete. - {stop, State}; - false -> - % There is at least one active consumer, so we - % shouldn't delete ourselves. - {continue, State} - end; - false -> - % There are some waiting listeners, thus we are not - % unused, so can continue life as normal without needing - % to check the process dictionary. - {continue, State} - end. +should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; +should_auto_delete(#q{has_had_consumers = false}) -> false; +should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, - round_robin = ActiveConsumers}) -> +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> noreply(State); - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> - NewActive = block_consumers(ChPid, ActiveConsumers), + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, + unacked_messages = UAM} -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), - case check_auto_delete( - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - round_robin = NewActive})) of - {continue, NewState} -> - noreply(NewState); - {stop, NewState} -> - {stop, normal, NewState} + case Txn of + none -> ok; + _ -> ok = rollback_work(Txn, qname(State)), + erase_tx(Txn) + end, + NewState = + deliver_or_enqueue_n( + [{Message, true} || + {_Messsage_id, Message} <- dict:to_list(UAM)], + State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), + blocked_consumers = remove_consumers( + ChPid, State#q.blocked_consumers)}), + case should_auto_delete(NewState) of + false -> noreply(NewState); + true -> {stop, normal, NewState} end end. @@ -325,12 +333,12 @@ check_queue_owner(none, _) -> ok; check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; check_queue_owner({_, _}, _) -> mismatch. -check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) -> +check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; -check_exclusive_access(none, false) -> +check_exclusive_access(none, false, _State) -> ok; -check_exclusive_access(none, true) -> - case is_unused() of +check_exclusive_access(none, true, State) -> + case is_unused(State) of true -> ok; false -> in_use end. @@ -355,16 +363,8 @@ run_poke_burst(MessageBuffer, State) -> State#q{message_buffer = MessageBuffer} end. -is_unused() -> - is_unused1(get()). - -is_unused1([]) -> - true; -is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest]) - when Consumers /= [] -> - false; -is_unused1([_ | Rest]) -> - is_unused1(Rest). +is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso + queue:is_empty(State#q.blocked_consumers). maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -456,13 +456,17 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -record_pending_message(Txn, Message) -> +record_pending_message(Txn, ChPid, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending], + ch_pid = ChPid}). record_pending_acks(Txn, ChPid, MsgIds) -> Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), - store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). + record_current_channel_tx(ChPid, Txn), + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], + ch_pid = ChPid}). process_pending(Txn, State) -> #tx{ch_pid = ChPid, @@ -519,9 +523,8 @@ i(messages, State) -> i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); -i(consumers, _) -> - lists:sum([length(Consumers) || - #cr{consumers = Consumers} <- all_ch_record()]); +i(consumers, State) -> + queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); i(transactions, _) -> length(all_tx_record()); i(memory, _) -> @@ -541,7 +544,7 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call({deliver_immediately, Txn, Message}, _From, State) -> +handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -555,12 +558,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -handle_call({deliver, Txn, Message}, _From, State) -> +handle_call({deliver, Txn, Message, ChPid}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> @@ -603,78 +606,91 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> + exclusive_consumer = ExistingHolder}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume) of + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumers = Consumers} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - store_ch_record(C#cr{consumers = [Consumer | Consumers], + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if Consumers == [] -> + if ConsumerCount == 0 -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, - exclusive_consumer = - if - ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, + exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - reply(ok, run_poke_burst(State1)) + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_poke_burst( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder, - round_robin = RoundRobin}) -> + State = #q{exclusive_consumer = Holder}) -> case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> - NewConsumers = lists:filter - (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, - Consumers), - store_ch_record(C#cr{consumers = NewConsumers}), - if NewConsumers == [] -> + C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> + store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), + if ConsumerCount == 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> ok end, ok = maybe_send_reply(ChPid, OkMsg), - case check_auto_delete( - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - round_robin = block_consumer(ChPid, - ConsumerTag, - RoundRobin)}) of - {continue, State1} -> - reply(ok, State1); - {stop, State1} -> - {stop, normal, ok, State1} + NewState = + State#q{exclusive_consumer = cancel_holder(ChPid, + ConsumerTag, + Holder), + active_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.active_consumers), + blocked_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.blocked_consumers)}, + case should_auto_delete(NewState) of + false -> reply(ok, NewState); + true -> {stop, normal, ok, NewState} end end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, - round_robin = RoundRobin}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); + active_consumers = ActiveConsumers}) -> + reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, + State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> IsEmpty = queue:is_empty(MessageBuffer), - IsUnused = is_unused(), + IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); @@ -693,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> - case check_exclusive_access(Holder, true) of + case check_exclusive_access(Holder, true, State) of in_use -> %% FIXME: Is this really the right answer? What if %% an active consumer's reader is actually the @@ -711,9 +727,9 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({deliver, Txn, Message}, State) -> +handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> @@ -769,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumers = Consumers, + fun (C = #cr{consumer_count = ConsumerCount, limiter_pid = OldLimiterPid, is_limit_active = Limited}) -> - if Consumers =/= [] andalso OldLimiterPid == undefined -> + if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl new file mode 100644 index 0000000000..761b3863b4 --- /dev/null +++ b/src/rabbit_basic.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/1, message/4, delivery/4]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(publish/1 :: (delivery()) -> + {ok, routing_result(), [pid()]} | not_found()). +-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). +-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> + message()). + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = ExchangeName}}) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}; + Other -> + Other + end. + +delivery(Mandatory, Immediate, Txn, Message) -> + #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, + sender = self(), message = Message}. + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + persistent_key = none}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2716ec478..3089bb6293 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -231,13 +231,13 @@ clear_permission_cache() -> ok. check_configure_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.configure). + check_resource_access(Username, Resource, configure). check_write_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.write). + check_resource_access(Username, Resource, write). check_read_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.read). + check_resource_access(Username, Resource, read). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -306,7 +306,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath}) -> + Content, State = #ch{ virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -317,12 +319,30 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - {noreply, publish(Mandatory, Immediate, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + persistent_key = PersistentKey}, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + case RoutingRes of + routed -> + ok; + unroutable -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); + not_delivered -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + end, + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) + end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -551,6 +571,13 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, {ok, FoundX} -> FoundX; {error, not_found} -> check_name('exchange', ExchangeNameBin), + case rabbit_misc:r_arg(VHostPath, exchange, Args, + <<"alternate-exchange">>) of + undefined -> ok; + AName -> check_read_permitted(ExchangeName, State), + check_write_permitted(AName, State), + ok + end, rabbit_exchange:declare(ExchangeName, CheckedType, Durable, @@ -579,8 +606,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:not_found(ExchangeName); {error, in_use} -> die_precondition_failed( "~s in use", [rabbit_misc:rs(ExchangeName)]); @@ -746,11 +772,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, exchange_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + rabbit_misc:not_found(QueueName); {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), @@ -767,30 +791,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(Mandatory, Immediate, Message, QPids, - State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> - Handled = deliver(QPids, Mandatory, Immediate, TxnKey, - Message, WriterPid), - case TxnKey of - none -> State; - _ -> add_tx_participants(Handled, State) - end. - -deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> - case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of - {ok, DeliveredQPids} -> DeliveredQPids; - {error, unroutable} -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), - []; - {error, not_delivered} -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), - [] - end. - basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index dc5824f1c9..76016a8cb2 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -74,7 +74,11 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> - {ok, _QueueNames} = rabbit_exchange:simple_publish( - false, false, LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data))), + {ok, _RoutingRes, _DeliveredQPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index fc89cfca51..8fb9eae304 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,8 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - simple_publish/6, simple_publish/3, - route/3]). + publish/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -57,8 +56,6 @@ -ifdef(use_specs). --type(publish_res() :: {'ok', [pid()]} | - not_found() | {'error', 'unroutable' | 'not_delivered'}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found' | @@ -75,11 +72,7 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). --spec(simple_publish/6 :: - (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> - publish_res()). --spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). +-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -164,9 +157,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of {ok, X} -> X; - {error, not_found} -> - rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(Name)]) + {error, not_found} -> rabbit_misc:not_found(Name) end. list(VHostPath) -> @@ -196,36 +187,41 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, - ContentTypeBin, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = #'P_basic'{content_type = ContentTypeBin}, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - persistent_key = none}, - simple_publish(Mandatory, Immediate, Message). - -%% Usable by Erlang code that wants to publish messages. -simple_publish(Mandatory, Immediate, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}) -> - case lookup(ExchangeName) of - {ok, Exchange} -> - QPids = route(Exchange, RoutingKey, Content), - rabbit_router:deliver(QPids, Mandatory, Immediate, - none, Message); - {error, Error} -> {error, Error} +publish(X, Delivery) -> + publish(X, [], Delivery). + +publish(X, Seen, Delivery = #delivery{ + message = #basic_message{routing_key = RK, content = C}}) -> + case rabbit_router:deliver(route(X, RK, C), Delivery) of + {_, []} = R -> + #exchange{name = XName, arguments = Args} = X, + case rabbit_misc:r_arg(XName, exchange, Args, + <<"alternate-exchange">>) of + undefined -> + R; + AName -> + NewSeen = [XName | Seen], + case lists:member(AName, NewSeen) of + true -> + R; + false -> + case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, Delivery); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end + end + end; + R -> + R end. -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% @@ -239,9 +235,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) -> route(X = #exchange{type = headers}, _RoutingKey, Content) -> Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, + undefined -> []; + H -> sort_arguments(H) + end, match_bindings(X, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end); @@ -252,6 +248,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route(X = #exchange{type = direct}, RoutingKey, _Content) -> match_routing_key(X, RoutingKey). +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(#exchange{name = Name}, Match) -> @@ -383,32 +382,40 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - call_with_exchange_and_queue( - ExchangeName, QueueName, - fun (X, Q) -> + binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> if Q#amqqueue.durable and not(X#exchange.durable) -> {error, durability_settings_incompatible}; - true -> ok = sync_binding( - ExchangeName, QueueName, RoutingKey, Arguments, - Q#amqqueue.durable, fun mnesia:write/3) + true -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3) end end). delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> + binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + maybe_auto_delete(X) + end + end). + +binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - ok = sync_binding( - ExchangeName, QueueName, RoutingKey, Arguments, - Q#amqqueue.durable, fun mnesia:delete_object/3), - maybe_auto_delete(X) + Fun(X, Q, #binding{exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = sort_arguments(Arguments)}) end). -sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> - Binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}, +sync_binding(Binding, Durable, Fun) -> ok = case Durable of true -> Fun(rabbit_durable_route, #route{binding = Binding}, write); @@ -474,7 +481,7 @@ parse_x_match(Other) -> %% Horrendous matching algorithm. Depends for its merge-like %% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and sync_binding/6 do. +%% route/3 and {add,delete}_binding/4 do. %% %% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! %% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. @@ -482,14 +489,14 @@ parse_x_match(Other) -> %% headers_match(Pattern, Data) -> MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, headers_match(Pattern, Data, true, false, MatchKind). headers_match([], _Data, AllMatch, _AnyMatch, all) -> @@ -516,8 +523,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], %% the corresponding data field. I've interpreted that to %% mean a type of "void" for the pattern field. PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. PT =/= DT -> {false, AnyMatch}; PV == DV -> {AllMatch, true}; true -> {false, AnyMatch} diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f9b6ebb9b..9f3dcbd071 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> +can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid) -> +can_send(LimiterPid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> +handle_call({can_send, QPid, AckRequired}, _From, + State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end. handle_cast(shutdown, State) -> diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f408336e94..dd5b498b07 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) -> message(Direction, Channel, MethodRecord, Content) -> gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). + {message, Direction, Channel, MethodRecord, Content}). info(Fmt) -> gen_server:cast(?SERVER, {info, Fmt}). @@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) -> {noreply, State}; handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), + [case Direction of + in -> "-->"; + out -> "<--" end, + Channel, + {MethodRecord, Content}]), {noreply, State}; handle_cast({info, Fmt}, State) -> error_logger:info_msg(Fmt), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index eced0b3cbe..72e16f0fc0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -36,9 +36,10 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). --export([r/3, r/2, rs/1]). +-export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). @@ -72,16 +73,19 @@ (atom() | amqp_error(), string(), [any()]) -> no_return()). -spec(protocol_error/4 :: (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(not_found/1 :: (r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) - when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> + r(K) when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} when is_subtype(K, atom())). +-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> + undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). @@ -139,6 +143,8 @@ protocol_error(Error, Explanation, Params, Method) -> CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), exit({amqp, Error, CompleteExplanation, Method}). +not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). + get_config(Key) -> case dirty_read({rabbit_config, Key}) of {ok, {rabbit_config, Key, V}} -> {ok, V}; @@ -169,6 +175,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) -> r(VHostPath, Kind) when is_binary(VHostPath) -> #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. +r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> + r_arg(VHostPath, Kind, Table, Key); +r_arg(VHostPath, Kind, Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin); + false -> undefined + end. + rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 15213861bd..575ecb0adc 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -31,7 +31,7 @@ -module(rabbit_mnesia). --export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0, +-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, reset/0, force_reset/0]). -export([table_names/0]). @@ -47,6 +47,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). +-spec(dir/0 :: () -> string()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> bool()). @@ -148,8 +149,10 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +dir() -> mnesia:system_info(directory). + ensure_mnesia_dir() -> - MnesiaDir = mnesia:system_info(directory) ++ "/", + MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of {error, Reason} -> throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); @@ -185,7 +188,7 @@ check_schema_integrity() -> %% it doesn't. cluster_nodes_config_filename() -> - mnesia:system_info(directory) ++ "/cluster_nodes.config". + dir() ++ "/cluster_nodes.config". create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), @@ -301,7 +304,7 @@ create_schema() -> move_db() -> mnesia:stop(), - MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"), + MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), BackupDir = lists:flatten( io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", @@ -418,7 +421,7 @@ reset(Force) -> ok = delete_cluster_nodes_config(), %% remove persistet messages and any other garbage we find lists:foreach(fun file:delete/1, - filelib:wildcard(mnesia:system_info(directory) ++ "/*")), + filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index f4fa45993a..d0d60ddf3d 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -259,7 +259,7 @@ log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, pending_logs = [Message | Logs]}. base_filename() -> - mnesia:system_info(directory) ++ "/rabbit_persister.LOG". + rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". take_snapshot(LogHandle, OldFileName, Snapshot) -> ok = disk_log:sync(LogHandle), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ba6d6e6a42..a09783bec4 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> (if Ex == connection_closed_abruptly -> + Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; true -> fun rabbit_log:error/2 diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0b06a063a7..10f80cc301 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). -export([start_link/0, - deliver/5]). + deliver/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -50,8 +50,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). +-spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}). -endif. @@ -62,13 +61,13 @@ start_link() -> -ifdef(BUG19758). -deliver(QPids, Mandatory, Immediate, Txn, Message) -> - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)). +deliver(QPids, Delivery) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)). -else. -deliver(QPids, Mandatory, Immediate, Txn, Message) -> +deliver(QPids, Delivery) -> %% we reduce inter-node traffic by grouping the qpids by node and %% only delivering one copy of the message to each node involved, %% which then in turn delivers it to its queues. @@ -81,16 +80,14 @@ deliver(QPids, Mandatory, Immediate, Txn, Message) -> [QPid], D) end, dict:new(), QPids)), - Mandatory, Immediate, Txn, Message). + Delivery). -deliver_per_node([{Node, QPids}], Mandatory, Immediate, - Txn, Message) - when Node == node() -> +deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> %% optimisation - check_delivery(Mandatory, Immediate, - run_bindings(QPids, Mandatory, Immediate, Txn, Message)); -deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, - Txn, Message) -> + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + run_bindings(QPids, Delivery)); +deliver_per_node(NodeQPids, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver in run_bindings below will deliver the %% message to the queue process asynchronously, and return true, @@ -98,22 +95,19 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {ok, lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), - QPids - end, - NodeQPids)}; -deliver_per_node(NodeQPids, Mandatory, Immediate, - Txn, Message) -> + {routed, + lists:flatmap( + fun ({Node, QPids}) -> + gen_server2:cast({?SERVER, Node}, {deliver, QPids, Delivery}), + QPids + end, + NodeQPids)}; +deliver_per_node(NodeQPids, Delivery) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server2:call( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}, - infinity) + try gen_server2:call({?SERVER, Node}, + {deliver, QPids, Delivery}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here @@ -130,7 +124,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, end, {false, []}, R), - check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). + check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, + {Routed, lists:append(Handled)}). -endif. @@ -139,19 +134,17 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, init([]) -> {ok, no_state}. -handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, - From, State) -> +handle_call({deliver, QPids, Delivery}, From, State) -> spawn( fun () -> - R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), + R = run_bindings(QPids, Delivery), gen_server2:reply(From, R) end), {noreply, State}. -handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, - State) -> +handle_cast({deliver, QPids, Delivery}, State) -> %% in order to preserve message ordering we must not spawn here - run_bindings(QPids, Mandatory, Immediate, Txn, Message), + run_bindings(QPids, Delivery), {noreply, State}. handle_info(_Info, State) -> @@ -165,11 +158,10 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- -run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> +run_bindings(QPids, Delivery) -> lists:foldl( fun (QPid, {Routed, Handled}) -> - case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, - Txn, Message, QPid) of + case catch rabbit_amqqueue:deliver(QPid, Delivery) of true -> {true, [QPid | Handled]}; false -> {true, Handled}; {'EXIT', _Reason} -> {Routed, Handled} @@ -179,6 +171,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> QPids). %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {error, unroutable}; -check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; -check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. +check_delivery(true, _ , {false, []}) -> {unroutable, []}; +check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; +check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8f0a3a8973..01757509ec 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -261,7 +261,7 @@ test_log_management() -> %% original log files are not writable ok = make_files_non_writable([MainLog, SaslLog]), {error, {{cannot_rotate_main_logs, _}, - {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), %% logging directed to tty (handlers were removed in last test) ok = clean_logs([MainLog, SaslLog], Suffix), @@ -280,7 +280,7 @@ test_log_management() -> ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, - {rabbit_sasl_report_file_h, SaslLog}]), + {rabbit_sasl_report_file_h, SaslLog}]), passed. test_log_management_during_startup() -> |
