diff options
Diffstat (limited to 'src')
31 files changed, 1676 insertions, 645 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl index fa41fe46b3..1501331d6b 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -206,7 +206,7 @@ ensure_started() -> case whereis(?MODULE) of undefined -> C = {pg_local, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, + 16#ffffffff, worker, [?MODULE]}, supervisor:start_child(kernel_safe_sup, C); PgLocalPid -> {ok, PgLocalPid} diff --git a/src/rabbit.erl b/src/rabbit.erl index 35d3ce4a5a..259ac0401a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -51,20 +51,39 @@ -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, - {enables, kernel_ready}]}). + {enables, external_infrastructure}]}). + +-rabbit_boot_step({worker_pool, + [{description, "worker pool"}, + {mfa, {rabbit_sup, start_child, [worker_pool_sup]}}, + {enables, external_infrastructure}]}). + +-rabbit_boot_step({external_infrastructure, + [{description, "external infrastructure ready"}]}). + +-rabbit_boot_step({rabbit_exchange_type_registry, + [{description, "exchange type registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_exchange_type_registry]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_log, [{description, "logging server"}, - {mfa, {rabbit_sup, start_child, [rabbit_log]}}, - {enables, kernel_ready}]}). + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_log]}}, + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_hooks, [{description, "internal event notification system"}, {mfa, {rabbit_hooks, start, []}}, - {enables, kernel_ready}]}). + {enables, kernel_ready}, + {requires, external_infrastructure}]}). -rabbit_boot_step({kernel_ready, - [{description, "kernel ready"}]}). + [{description, "kernel ready"}, + {requires, external_infrastructure}]}). -rabbit_boot_step({rabbit_alarm, [{description, "alarm handler"}, @@ -72,23 +91,18 @@ {requires, kernel_ready}, {enables, core_initialized}]}). --rabbit_boot_step({rabbit_amqqueue_sup, - [{description, "queue supervisor"}, - {mfa, {rabbit_amqqueue, start, []}}, - {requires, kernel_ready}, - {enables, core_initialized}]}). - -rabbit_boot_step({rabbit_router, [{description, "cluster router"}, - {mfa, {rabbit_sup, start_child, [rabbit_router]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_router]}}, {requires, kernel_ready}, {enables, core_initialized}]}). -rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, - {mfa, {rabbit_sup, start_child, [rabbit_node_monitor]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_node_monitor]}}, {requires, kernel_ready}, - {requires, rabbit_amqqueue_sup}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -104,18 +118,20 @@ {mfa, {rabbit_exchange, recover, []}}, {requires, empty_db_check}]}). --rabbit_boot_step({queue_recovery, - [{description, "queue recovery"}, - {mfa, {rabbit_amqqueue, recover, []}}, - {requires, exchange_recovery}]}). +-rabbit_boot_step({queue_sup_queue_recovery, + [{description, "queue supervisor and queue recovery"}, + {mfa, {rabbit_amqqueue, start, []}}, + {requires, empty_db_check}]}). -rabbit_boot_step({persister, - [{mfa, {rabbit_sup, start_child, [rabbit_persister]}}, - {requires, queue_recovery}]}). + [{mfa, {rabbit_sup, start_child, + [rabbit_persister]}}, + {requires, queue_sup_queue_recovery}]}). -rabbit_boot_step({guid_generator, [{description, "guid generator"}, - {mfa, {rabbit_sup, start_child, [rabbit_guid]}}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_guid]}}, {requires, persister}, {enables, routing_ready}]}). @@ -187,15 +203,12 @@ stop() -> ok = rabbit_misc:stop_applications(?APPS). stop_and_halt() -> - spawn(fun () -> - SleepTime = 1000, - rabbit_log:info("Stop-and-halt request received; " - "halting in ~p milliseconds~n", - [SleepTime]), - timer:sleep(SleepTime), - init:stop() - end), - case catch stop() of _ -> ok end. + try + stop() + after + init:stop() + end, + ok. status() -> [{running_applications, application:which_applications()}] ++ diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 3b9eeec18a..7e96d9a3a8 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -57,10 +57,9 @@ start() -> ok = alarm_handler:add_alarm_handler(?MODULE, []), {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), ok = case MemoryWatermark == 0 of - true -> - ok; - false -> - rabbit_sup:start_child(vm_memory_monitor, [MemoryWatermark]) + true -> ok; + false -> rabbit_sup:start_restartable_child(vm_memory_monitor, + [MemoryWatermark]) end, ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f25d72e4f..e13dd9edb5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -31,7 +31,7 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/4, delete/3, purge/1]). +-export([start/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -63,7 +63,6 @@ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). -spec(start/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -98,7 +97,7 @@ -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> - {'ok', non_neg_integer(), msg()} | 'empty'). + {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(basic_consume/8 :: (amqqueue(), boolean(), pid(), pid(), pid() | 'undefined', ctag(), boolean(), any()) -> @@ -107,6 +106,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -117,45 +117,47 @@ %%---------------------------------------------------------------------------- start() -> + DurableQueues = find_durable_queues(), {ok,_} = supervisor:start_child( rabbit_sup, {rabbit_amqqueue_sup, {rabbit_amqqueue_sup, start_link, []}, transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + _RealDurableQueues = recover_durable_queues(DurableQueues), ok. -recover() -> - ok = recover_durable_queues(), - ok. - -recover_durable_queues() -> +find_durable_queues() -> Node = node(), - lists:foreach( - fun (RecoveredQ) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(rabbit_durable_queue), + node(Pid) == Node])) + end). + +recover_durable_queues(DurableQueues) -> + lists:foldl( + fun (RecoveredQ, Acc) -> Q = start_queue_process(RecoveredQ), %% We need to catch the case where a client connected to %% another node has deleted the queue (and possibly %% re-created it). case rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end + fun () -> + case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, + read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end end) of - true -> ok; - false -> exit(Q#amqqueue.pid, shutdown) + true -> [Q | Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc end - end, - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(rabbit_durable_queue), - node(Pid) == Node])) - end)), - ok. + end, [], DurableQueues). declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -284,7 +286,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( @@ -329,39 +331,53 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). + gen_server2:pcast(QPid, 7, {unblock, ChPid}). + +flush_all(QPids, ChPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, + QPids). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> - ok = rabbit_exchange:delete_queue_bindings(QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), - ok - end - end). + case + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [_] -> + ok = mnesia:delete({rabbit_queue, QueueName}), + ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% we want to execute some things, as + %% decided by rabbit_exchange, after the + %% transaction. + rabbit_exchange:delete_queue_bindings(QueueName) + end + end) of + Err = {error, _} -> Err; + PostHook -> + PostHook(), + ok + end. on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:fold( - fun (QueueName, Acc) -> - ok = rabbit_exchange:delete_transient_queue_bindings( - QueueName), - ok = mnesia:delete({rabbit_queue, QueueName}), - Acc - end, - ok, - qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} - <- mnesia:table(rabbit_queue), - node(Pid) == Node])) - end). + [Hook() || + Hook <- rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([delete_queue(QueueName) || + #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(rabbit_queue), + node(Pid) == Node])) + end)], + ok. + +delete_queue(QueueName) -> + Post = rabbit_exchange:delete_transient_queue_bindings(QueueName), + ok = mnesia:delete({rabbit_queue, QueueName}), + Post. pseudo_queue(QueueName, Pid) -> #amqqueue{name = QueueName, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4791f9524..ba41f55030 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,8 +36,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). -export([start_link/1, info_keys/0]). @@ -376,7 +374,7 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> +persist_message(_Txn, _QName, #basic_message{is_persistent = false}) -> ok; persist_message(Txn, QName, Message) -> M = Message#basic_message{ @@ -384,29 +382,28 @@ persist_message(Txn, QName, Message) -> content = rabbit_binary_parser:clear_decoded_content( Message#basic_message.content)}, persist_work(Txn, QName, - [{publish, M, {QName, M#basic_message.persistent_key}}]). + [{publish, M, {QName, M#basic_message.guid}}]). persist_delivery(_QName, _Message, true) -> ok; -persist_delivery(_QName, #basic_message{persistent_key = none}, +persist_delivery(_QName, #basic_message{is_persistent = false}, _IsDelivered) -> ok; -persist_delivery(QName, #basic_message{persistent_key = PKey}, +persist_delivery(QName, #basic_message{guid = Guid}, _IsDelivered) -> - persist_work(none, QName, [{deliver, {QName, PKey}}]). + persist_work(none, QName, [{deliver, {QName, Guid}}]). persist_acks(Txn, QName, Messages) -> persist_work(Txn, QName, - [{ack, {QName, PKey}} || - #basic_message{persistent_key = PKey} <- Messages, - PKey =/= none]). + [{ack, {QName, Guid}} || #basic_message{ + guid = Guid, is_persistent = true} <- Messages]). -persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> +persist_auto_ack(_QName, #basic_message{is_persistent = false}) -> ok; -persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> +persist_auto_ack(QName, #basic_message{guid = Guid}) -> %% auto-acks are always non-transactional - rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + rabbit_persister:dirty_work([{ack, {QName, Guid}}]). persist_work(_Txn,_QName, []) -> ok; @@ -826,7 +823,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({flush, ChPid}, State) -> + ok = rabbit_channel:flushed(ChPid, self()), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 9ebb6e72e0..4ab7a2a0b1 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -36,6 +36,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). +-export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -48,7 +49,7 @@ -spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), - binary()) -> message()). + binary()) -> (message() | {'error', any()})). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). @@ -57,6 +58,8 @@ publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). -spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). +-spec(is_message_persistent/1 :: + (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})). -endif. @@ -93,10 +96,17 @@ from_content(Content) -> message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = build_content(Properties, BodyBin), - persistent_key = none}. + Content = build_content(Properties, BodyBin), + case is_message_persistent(Content) of + {invalid, Other} -> + {error, {invalid_delivery_mode, Other}}; + IsPersistent when is_boolean(IsPersistent) -> + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent} + end. properties(P = #'P_basic'{}) -> P; @@ -130,3 +140,12 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, properties(Properties), BodyBin))). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> {invalid, Other} + end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 585c59dc9b..9aeb4623f1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1,4 +1,4 @@ -%% The contents of this file are subject to the Mozilla Public Licenses +%% The contents of this file are subject to the Mozilla Public License %% Version 1.1 (the "License"); you may not use this file except in %% compliance with the License. You may obtain a copy of the License at %% http://www.mozilla.org/MPL/ @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -45,11 +45,8 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, - most_recently_declared_queue, consumer_mapping}). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). + username, virtual_host, most_recently_declared_queue, + consumer_mapping, blocking}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -75,8 +72,9 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -110,7 +108,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). + +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). list() -> pg_local:get_members(rabbit_channels). @@ -152,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}, + consumer_mapping = dict:new(), + blocking = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -190,6 +192,9 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; +handle_cast({flushed, QPid}, State) -> + {noreply, queue_blocked(QPid, State)}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -215,7 +220,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; +handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -331,6 +338,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +queue_blocked(QPid, State = #ch{blocking = Blocking}) -> + case dict:find(QPid, Blocking) of + error -> State; + {ok, MRef} -> true = erlang:demonitor(MRef), + Blocking1 = dict:erase(QPid, Blocking), + ok = case dict:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + State#ch{blocking = Blocking1} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -362,14 +383,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), - PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_guid:guid(); - false -> none - end, + IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, - persistent_key = PersistentKey}, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -540,25 +559,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - unacked_message_q = UAMQ }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; + _, State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> undefined; + {undefined, _} -> start_limiter(State); + {_, _} -> LimiterPid + end, + LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of + ok -> LimiterPid1; + stopped -> unlimit_queues(State) + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -791,9 +802,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'channel.flow'{active = _}, _, State) -> - %% FIXME: implement - {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; + +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + ok = rabbit_limiter:block(LimiterPid1), + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + case Queues of + [] -> {reply, #'channel.flow_ok'{active = false}, State}; + _ -> {noreply, State#ch{limiter_pid = LimiterPid1, + blocking = dict:from_list(Queues)}} + end; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we @@ -928,21 +961,27 @@ fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( fun ({_DTag, _CTag, {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> - %% dict:append would be simpler and avoid the - %% lists:reverse in handle_message({recover, true}, - %% ...). However, it is significantly slower when - %% going beyond a few thousand elements. - dict:update(QPid, - fun (MsgIds) -> [MsgId | MsgIds] end, - [MsgId], - D) + %% dict:append would avoid the lists:reverse in + %% handle_message({recover, true}, ...). However, it + %% is significantly slower when going beyond a few + %% thousand elements. + rabbit_misc:dict_cons(QPid, MsgId, D) end, dict:new(), UAQ), dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). +start_limiter(State = #ch{unacked_message_q = UAMQ}) -> + LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid. + notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). +unlimit_queues(State) -> + ok = limit_queues(undefined, State), + undefined. + limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). @@ -972,16 +1011,15 @@ notify_limiter(LimiterPid, Acked) -> Count -> rabbit_limiter:ack(LimiterPid, Count) end. -is_message_persistent(#content{properties = #'P_basic'{ - delivery_mode = Mode}}) -> - case Mode of - 1 -> false; - 2 -> true; - undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - " - "treating as 1, non-persistent~n", - [Other]), - false +is_message_persistent(Content) -> + case rabbit_basic:is_message_persistent(Content) of + {invalid, Other} -> + rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", + [Other]), + false; + IsPersistent when is_boolean(IsPersistent) -> + IsPersistent end. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6aac442888..d1834b3b73 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -46,6 +46,7 @@ -spec(stop/0 :: () -> 'ok'). -spec(action/4 :: (atom(), erlang_node(), [string()], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -130,86 +131,7 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmqctl [-q] [-n <node>] <command> [<arg> ...] - -Available commands: - - stop - stops the RabbitMQ application and halts the node - stop_app - stops the RabbitMQ application, leaving the node running - start_app - starts the RabbitMQ application on an already-running node - reset - resets node to default configuration, deleting all data - force_reset - cluster <ClusterNode> ... - status - rotate_logs [Suffix] - close_connection <ConnectionPid> <ExplanationString> - - add_user <UserName> <Password> - delete_user <UserName> - change_password <UserName> <NewPassword> - list_users - - add_vhost <VHostPath> - delete_vhost <VHostPath> - list_vhosts - - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> - clear_permissions [-p <VHostPath>] <UserName> - list_permissions [-p <VHostPath>] - list_user_permissions <UserName> - - list_queues [-p <VHostPath>] [<QueueInfoItem> ...] - list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...] - list_bindings [-p <VHostPath>] - list_connections [<ConnectionInfoItem> ...] - list_channels [<ChannelInfoItem> ...] - list_consumers [-p <VHostPath>] - -Quiet output mode is selected with the \"-q\" flag. Informational -messages are suppressed when quiet mode is in effect. - -<node> should be the name of the master node of the RabbitMQ -cluster. It defaults to the node named \"rabbit\" on the local -host. On a host named \"server.example.com\", the master node will -usually be rabbit@server (unless RABBITMQ_NODENAME has been set to -some non-default value at broker startup time). The output of hostname --s is usually the correct suffix to use after the \"@\" sign. - -The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results. The -default value is \"/\". - -<QueueInfoItem> must be a member of the list [name, durable, -auto_delete, arguments, pid, owner_pid, exclusive_consumer_pid, -exclusive_consumer_tag, messages_ready, messages_unacknowledged, -messages_uncommitted, messages, acks_uncommitted, consumers, -transactions, memory]. The default is to display name and (number of) -messages. - -<ExchangeInfoItem> must be a member of the list [name, type, durable, -auto_delete, arguments]. The default is to display name and type. - -The output format for \"list_bindings\" is a list of rows containing -exchange name, queue name, routing key and arguments, in that order. - -<ConnectionInfoItem> must be a member of the list [pid, address, port, -peer_address, peer_port, state, channels, user, vhost, timeout, -frame_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, -send_pend]. The default is to display user, peer_address, peer_port -and state. - -<ChannelInfoItem> must be a member of the list [pid, connection, -number, user, vhost, transactional, consumer_count, -messages_unacknowledged, acks_uncommitted, prefetch_count]. The -default is to display pid, user, transactional, consumer_count, -messages_unacknowledged. - -The output format for \"list_consumers\" is a list of rows containing, -in order, the queue name, channel process id, consumer tag, and a -boolean indicating whether acknowledgements are expected from the -consumer. - -"), + io:format("~s", [rabbit_ctl_usage:usage()]), halt(1). action(stop, Node, [], Inform) -> diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl index 078cf620f4..f19e8d025a 100644 --- a/src/rabbit_dialyzer.erl +++ b/src/rabbit_dialyzer.erl @@ -38,9 +38,9 @@ -ifdef(use_specs). --spec(create_basic_plt/1 :: (string()) -> 'ok'). --spec(add_to_plt/2 :: (string(), string()) -> 'ok'). --spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(create_basic_plt/1 :: (file_path()) -> 'ok'). +-spec(add_to_plt/2 :: (file_path(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (file_path(), string()) -> 'ok'). -spec(halt_with_code/1 :: (atom()) -> no_return()). -endif. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 832acd16c4..1cfba00eb2 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -30,7 +30,6 @@ %% -module(rabbit_exchange). --include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -40,7 +39,7 @@ -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). --export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). +-export([check_type/1, assert_type/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -49,7 +48,6 @@ -import(mnesia). -import(sets). -import(lists). --import(qlc). -import(regexp). %%---------------------------------------------------------------------------- @@ -82,10 +80,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> fun(() -> none())). -spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -100,17 +96,37 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - ok = rabbit_misc:table_foreach( - fun(Exchange) -> ok = mnesia:write(rabbit_exchange, - Exchange, write) - end, rabbit_durable_exchange), - ok = rabbit_misc:table_foreach( - fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(rabbit_route, - Route, write), - ok = mnesia:write(rabbit_reverse_route, - ReverseRoute, write) - end, rabbit_durable_route). + Exs = rabbit_misc:table_fold( + fun(Exchange, Acc) -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + [Exchange | Acc] + end, [], rabbit_durable_exchange), + Bs = rabbit_misc:table_fold( + fun(Route = #route{binding = B}, Acc) -> + {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(rabbit_route, + Route, write), + ok = mnesia:write(rabbit_reverse_route, + ReverseRoute, write), + [B | Acc] + end, [], rabbit_durable_route), + recover_with_bindings(Bs, Exs), + ok. + +recover_with_bindings(Bs, Exs) -> + recover_with_bindings( + lists:keysort(#binding.exchange_name, Bs), + lists:keysort(#exchange.name, Exs), []). + +recover_with_bindings([B = #binding{exchange_name = Name} | Rest], + Xs = [#exchange{name = Name} | _], + Bindings) -> + recover_with_bindings(Rest, Xs, [B | Bindings]); +recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> + (type_to_module(Type)):recover(X, Bindings), + recover_with_bindings(Bs, Xs, []); +recover_with_bindings([], [], []) -> + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -118,31 +134,53 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> durable = Durable, auto_delete = AutoDelete, arguments = Args}, - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of - [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), - if Durable -> - ok = mnesia:write(rabbit_durable_exchange, - Exchange, write); - true -> ok - end, - Exchange; - [ExistingX] -> ExistingX - end - end). + %% We want to upset things if it isn't ok; this is different from + %% the other hooks invocations, where we tend to ignore the return + %% value. + TypeModule = type_to_module(Type), + ok = TypeModule:validate(Exchange), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> + ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = case Durable of + true -> + mnesia:write(rabbit_durable_exchange, + Exchange, write); + false -> + ok + end, + {new, Exchange}; + [ExistingX] -> + {existing, ExistingX} + end + end) of + {new, X} -> TypeModule:create(X), + X; + {existing, X} -> X; + Err -> Err + end. -check_type(<<"fanout">>) -> - fanout; -check_type(<<"direct">>) -> - direct; -check_type(<<"topic">>) -> - topic; -check_type(<<"headers">>) -> - headers; -check_type(T) -> - rabbit_misc:protocol_error( - command_invalid, "invalid exchange type '~s'", [T]). +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + case rabbit_exchange_type_registry:lookup_module(T) of + {ok, Module} -> Module; + {error, not_found} -> rabbit_misc:protocol_error( + command_invalid, + "invalid exchange type '~s'", [T]) + end. + +%% Used with binaries sent over the wire; the type may not exist. +check_type(TypeBin) -> + case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown exchange type '~s'", [TypeBin]); + T -> + _Module = type_to_module(T), + T + end. assert_type(#exchange{ type = ActualType }, RequiredType) when ActualType == RequiredType -> @@ -157,7 +195,7 @@ lookup(Name) -> lookup_or_die(Name) -> case lookup(Name) of - {ok, X} -> X; + {ok, X} -> X; {error, not_found} -> rabbit_misc:not_found(Name) end. @@ -193,9 +231,8 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X, Delivery) -> publish(X, [], Delivery). -publish(X, Seen, Delivery = #delivery{ - message = #basic_message{routing_key = RK, content = C}}) -> - case rabbit_router:deliver(route(X, RK, C), Delivery) of +publish(X = #exchange{type = Type}, Seen, Delivery) -> + case (type_to_module(Type)):publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -205,95 +242,24 @@ publish(X, Seen, Delivery = #delivery{ AName -> NewSeen = [XName | Seen], case lists:member(AName, NewSeen) of - true -> - R; - false -> - case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end + true -> R; + false -> case lookup(AName) of + {ok, AX} -> + publish(AX, NewSeen, Delivery); + {error, not_found} -> + rabbit_log:warning( + "alternate exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(AName)]), + R + end end end; R -> R end. -%% return the list of qpids to which a message with a given routing -%% key, sent to a particular exchange, should be delivered. -%% -%% The function ensures that a qpid appears in the return list exactly -%% as many times as a message should be delivered to it. With the -%% current exchange types that is at most once. -route(X = #exchange{type = topic}, RoutingKey, _Content) -> - match_bindings(X, fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end); - -route(X = #exchange{type = headers}, _RoutingKey, Content) -> - Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, - match_bindings(X, fun (#binding{args = Spec}) -> - headers_match(Spec, Headers) - end); - -route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> - match_routing_key(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey, _Content) -> - match_routing_key(X, RoutingKey). - -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - -%% TODO: Maybe this should be handled by a cursor instead. -%% TODO: This causes a full scan for each entry with the same exchange -match_bindings(#exchange{name = Name}, Match) -> - Query = qlc:q([QName || #route{binding = Binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName}} <- - mnesia:table(rabbit_route), - ExchangeName == Name, - Match(Binding)]), - lookup_qpids( - try - mnesia:async_dirty(fun qlc:e/1, [Query]) - catch exit:{aborted, {badarg, _}} -> - %% work around OTP-7025, which was fixed in R12B-1, by - %% falling back on a less efficient method - [QName || #route{binding = Binding = #binding{ - queue_name = QName}} <- - mnesia:dirty_match_object( - rabbit_route, - #route{binding = #binding{exchange_name = Name, - _ = '_'}}), - Match(Binding)] - end). - -match_routing_key(#exchange{name = Name}, RoutingKey) -> - MatchHead = #route{binding = #binding{exchange_name = Name, - queue_name = '$1', - key = RoutingKey, - _ = '_'}}, - lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). - -lookup_qpids(Queues) -> - sets:fold( - fun(Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], sets:from_list(Queues)). - %% TODO: Should all of the route and binding management not be %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? @@ -302,13 +268,13 @@ delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, reverse_route(Route), write), - ok = delete_forward_routes(Route) + ok = delete_forward_routes(Route), + Route#route.binding end || Route <- mnesia:match_object( rabbit_route, #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, - write)], - ok. + write)]. delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -317,21 +283,55 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). delete_queue_bindings(QueueName, FwdDeleteFun) -> - Exchanges = exchanges_for_queue(QueueName), - [begin - ok = FwdDeleteFun(reverse_route(Route)), - ok = mnesia:delete_object(rabbit_reverse_route, Route, write) - end || Route <- mnesia:match_object( - rabbit_reverse_route, - reverse_route( - #route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - write)], - [begin - [X] = mnesia:read({rabbit_exchange, ExchangeName}), - ok = maybe_auto_delete(X) - end || ExchangeName <- Exchanges], - ok. + DeletedBindings = + [begin + Route = reverse_route(ReverseRoute), + ok = FwdDeleteFun(Route), + ok = mnesia:delete_object(rabbit_reverse_route, + ReverseRoute, write), + Route#route.binding + end || ReverseRoute + <- mnesia:match_object( + rabbit_reverse_route, + reverse_route(#route{binding = #binding{ + queue_name = QueueName, + _ = '_'}}), + write)], + Cleanup = cleanup_deleted_queue_bindings( + lists:keysort(#binding.exchange_name, DeletedBindings), []), + fun () -> + lists:foreach( + fun ({{IsDeleted, X = #exchange{ type = Type }}, Bs}) -> + Module = type_to_module(Type), + case IsDeleted of + auto_deleted -> Module:delete(X, Bs); + no_delete -> Module:remove_bindings(X, Bs) + end + end, Cleanup) + end. + +%% Requires that its input binding list is sorted in exchange-name +%% order, so that the grouping of bindings (for passing to +%% cleanup_deleted_queue_bindings1) works properly. +cleanup_deleted_queue_bindings([], Acc) -> + Acc; +cleanup_deleted_queue_bindings( + [B = #binding{exchange_name = ExchangeName} | Bs], Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B], Acc). + +cleanup_deleted_queue_bindings( + ExchangeName, [B = #binding{exchange_name = ExchangeName} | Bs], + Bindings, Acc) -> + cleanup_deleted_queue_bindings(ExchangeName, Bs, [B | Bindings], Acc); +cleanup_deleted_queue_bindings(ExchangeName, Deleted, Bindings, Acc) -> + %% either Deleted is [], or its head has a non-matching ExchangeName + NewAcc = [cleanup_deleted_queue_bindings1(ExchangeName, Bindings) | Acc], + cleanup_deleted_queue_bindings(Deleted, NewAcc). + +cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> + [X] = mnesia:read({rabbit_exchange, ExchangeName}), + {maybe_auto_delete(X), Bindings}. + delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), @@ -340,15 +340,6 @@ delete_forward_routes(Route) -> delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). -exchanges_for_queue(QueueName) -> - MatchHead = reverse_route( - #route{binding = #binding{exchange_name = '$1', - queue_name = QueueName, - _ = '_'}}), - sets:to_list( - sets:from_list( - mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). - contains(Table, MatchHead) -> try continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) @@ -385,37 +376,61 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + case mnesia:read(rabbit_route, B) of + [] -> + sync_binding(B, Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} + end + end + end) of + {new, Exchange = #exchange{ type = Type }, Binding} -> + (type_to_module(Type)):add_binding(Exchange, Binding); + {existing, _, _} -> + ok; + Err = {error, _} -> + Err + end. delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> - binding_action( - ExchangeName, QueueName, RoutingKey, Arguments, - fun (X, Q, B) -> - case mnesia:match_object(rabbit_route, #route{binding = B}, - write) of - [] -> {error, binding_not_found}; - _ -> ok = sync_binding(B, Q#amqqueue.durable, - fun mnesia:delete_object/3), - maybe_auto_delete(X) - end - end). + case binding_action( + ExchangeName, QueueName, RoutingKey, Arguments, + fun (X, Q, B) -> + case mnesia:match_object(rabbit_route, #route{binding = B}, + write) of + [] -> {error, binding_not_found}; + _ -> ok = sync_binding(B, Q#amqqueue.durable, + fun mnesia:delete_object/3), + {maybe_auto_delete(X), B} + end + end) of + Err = {error, _} -> + Err; + {{Action, X = #exchange{ type = Type }}, B} -> + Module = type_to_module(Type), + case Action of + auto_delete -> Module:delete(X, [B]); + no_delete -> Module:remove_bindings(X, [B]) + end + end. binding_action(ExchangeName, QueueName, RoutingKey, Arguments, Fun) -> call_with_exchange_and_queue( ExchangeName, QueueName, fun (X, Q) -> - Fun(X, Q, #binding{exchange_name = ExchangeName, - queue_name = QueueName, - key = RoutingKey, - args = sort_arguments(Arguments)}) + Fun(X, Q, #binding{ + exchange_name = ExchangeName, + queue_name = QueueName, + key = RoutingKey, + args = rabbit_misc:sort_field_table(Arguments)}) end). sync_binding(Binding, Durable, Fun) -> @@ -440,8 +455,8 @@ list_bindings(VHostPath) -> rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), - _ = '_'}, - _ = '_'})]. + _ = '_'}, + _ = '_'})]. route_with_reverse(#route{binding = Binding}) -> route_with_reverse(Binding); @@ -456,136 +471,60 @@ reverse_route(#reverse_route{reverse_binding = Binding}) -> #route{binding = reverse_binding(Binding)}. reverse_binding(#reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}; + queue_name = Queue, + key = Key, + args = Args}; reverse_binding(#binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}) -> + queue_name = Queue, + key = Key, + args = Args}) -> #reverse_binding{exchange_name = Exchange, - queue_name = Queue, - key = Key, - args = Args}. - -default_headers_match_kind() -> all. - -parse_x_match(<<"all">>) -> all; -parse_x_match(<<"any">>) -> any; -parse_x_match(Other) -> - rabbit_log:warning("Invalid x-match field value ~p; expected all or any", - [Other]), - default_headers_match_kind(). - -%% Horrendous matching algorithm. Depends for its merge-like -%% (linear-time) behaviour on the lists:keysort (sort_arguments) that -%% route/3 and {add,delete}_binding/4 do. -%% -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. -%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -%% -headers_match(Pattern, Data) -> - MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " - "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, - headers_match(Pattern, Data, true, false, MatchKind). - -headers_match([], _Data, AllMatch, _AnyMatch, all) -> - AllMatch; -headers_match([], _Data, _AllMatch, AnyMatch, any) -> - AnyMatch; -headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, - AllMatch, AnyMatch, MatchKind) -> - headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); -headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> - headers_match([], [], false, AnyMatch, MatchKind); -headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK > DK -> - headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); -headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], - _AllMatch, AnyMatch, MatchKind) when PK < DK -> - headers_match(PRest, Data, false, AnyMatch, MatchKind); -headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], - AllMatch, AnyMatch, MatchKind) when PK == DK -> - {AllMatch1, AnyMatch1} = - if - %% It's not properly specified, but a "no value" in a - %% pattern field is supposed to mean simple presence of - %% the corresponding data field. I've interpreted that to - %% mean a type of "void" for the pattern field. - PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. - PT =/= DT -> {false, AnyMatch}; - PV == DV -> {AllMatch, true}; - true -> {false, AnyMatch} - end, - headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). - -split_topic_key(Key) -> - {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), - KeySplit. - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). - -delete(ExchangeName, _IfUnused = true) -> - call_with_exchange(ExchangeName, fun conditional_delete/1); -delete(ExchangeName, _IfUnused = false) -> - call_with_exchange(ExchangeName, fun unconditional_delete/1). - -maybe_auto_delete(#exchange{auto_delete = false}) -> - ok; + queue_name = Queue, + key = Key, + args = Args}. + +delete(ExchangeName, IfUnused) -> + Fun = case IfUnused of + true -> fun conditional_delete/1; + false -> fun unconditional_delete/1 + end, + case call_with_exchange(ExchangeName, Fun) of + {deleted, X = #exchange{type = Type}, Bs} -> + (type_to_module(Type)):delete(X, Bs), + ok; + Error = {error, _InUseOrNotFound} -> + Error + end. + +maybe_auto_delete(Exchange = #exchange{auto_delete = false}) -> + {no_delete, Exchange}; maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> - conditional_delete(Exchange), - ok. + case conditional_delete(Exchange) of + {error, in_use} -> {no_delete, Exchange}; + {deleted, Exchange, []} -> {auto_deleted, Exchange} + end. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, %% we need to check for durable routes here too in case a bunch of %% routes to durable queues have been removed temporarily as a %% result of a node failure - case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of + case contains(rabbit_route, Match) orelse + contains(rabbit_durable_route, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. -unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_exchange_bindings(ExchangeName), +unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> + Bindings = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}). + ok = mnesia:delete({rabbit_exchange, ExchangeName}), + {deleted, Exchange, Bindings}. %%---------------------------------------------------------------------------- %% EXTENDED API diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl new file mode 100644 index 0000000000..a8c071e681 --- /dev/null +++ b/src/rabbit_exchange_type.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + {description, 0}, + {publish, 2}, + + %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} + {validate, 1}, + + %% called after declaration when previously absent + {create, 1}, + + %% called when recovering + {recover, 2}, + + %% called after exchange deletion. + {delete, 2}, + + %% called after a binding has been added + {add_binding, 2}, + + %% called after bindings have been deleted. + {remove_bindings, 2} + + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl new file mode 100644 index 0000000000..9b71e0e1d1 --- /dev/null +++ b/src/rabbit_exchange_type_direct.erl @@ -0,0 +1,63 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_direct). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type direct"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"direct">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"direct">>}, + {description, <<"AMQP direct exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, RoutingKey), + Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl new file mode 100644 index 0000000000..311654ab21 --- /dev/null +++ b/src/rabbit_exchange_type_fanout.erl @@ -0,0 +1,61 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_fanout). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type fanout"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"fanout">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +description() -> + [{name, <<"fanout">>}, + {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery) -> + rabbit_router:deliver(rabbit_router:match_routing_key(Name, '_'), Delivery). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl new file mode 100644 index 0000000000..285dab1a03 --- /dev/null +++ b/src/rabbit_exchange_type_headers.erl @@ -0,0 +1,137 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_headers). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type headers"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"headers">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-ifdef(use_specs). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-endif. + +description() -> + [{name, <<"headers">>}, + {description, <<"AMQP headers exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, + Delivery = #delivery{message = #basic_message{content = Content}}) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> rabbit_misc:sort_field_table(H) + end, + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end), + Delivery). + +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort +%% (rabbit_misc:sort_field_table) that route/3 and +%% rabbit_exchange:{add,delete}_binding/4 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl new file mode 100644 index 0000000000..175d15ad83 --- /dev/null +++ b/src/rabbit_exchange_type_registry.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_registry). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([register/2, binary_to_type/1, lookup_module/1]). + +-define(SERVER, ?MODULE). +-define(ETS_NAME, ?MODULE). + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}). +-spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}). +-spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}). + +-endif. + +%%--------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%--------------------------------------------------------------------------- + +register(TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, TypeName, ModuleName}). + +%% This is used with user-supplied arguments (e.g., on exchange +%% declare), so we restrict it to existing atoms only. This means it +%% can throw a badarg, indicating that the type cannot have been +%% registered. +binary_to_type(TypeBin) when is_binary(TypeBin) -> + case catch list_to_existing_atom(binary_to_list(TypeBin)) of + {'EXIT', {badarg, _}} -> {error, not_found}; + TypeAtom -> TypeAtom + end. + +lookup_module(T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, T) of + [{_, Module}] -> + {ok, Module}; + [] -> + {error, not_found} + end. + +%%--------------------------------------------------------------------------- + +internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +internal_register(TypeName, ModuleName) + when is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(ModuleName), + true = ets:insert(?ETS_NAME, + {internal_binary_to_type(TypeName), ModuleName}), + ok. + +sanity_check_module(Module) -> + case catch lists:member(rabbit_exchange_type, + lists:flatten( + [Bs || {Attr, Bs} <- + Module:module_info(attributes), + Attr =:= behavior orelse + Attr =:= behaviour])) of + {'EXIT', {undef, _}} -> {error, not_module}; + false -> {error, not_exchange_type}; + true -> ok + end. + +%%--------------------------------------------------------------------------- + +init([]) -> + ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), + {ok, none}. + +handle_call({register, TypeName, ModuleName}, _From, State) -> + ok = internal_register(TypeName, ModuleName), + {reply, ok, State}; +handle_call(Request, _From, State) -> + {stop, {unhandled_call, Request}, State}. + +handle_cast(Request, State) -> + {stop, {unhandled_cast, Request}, State}. + +handle_info(Message, State) -> + {stop, {unhandled_info, Message}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl new file mode 100644 index 0000000000..8a3dceeaeb --- /dev/null +++ b/src/rabbit_exchange_type_topic.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange_type_topic). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, publish/2]). +-export([validate/1, create/1, recover/2, delete/2, + add_binding/2, remove_bindings/2]). +-include("rabbit_exchange_type_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "exchange type topic"}, + {mfa, {rabbit_exchange_type_registry, register, + [<<"topic">>, ?MODULE]}}, + {requires, rabbit_exchange_type_registry}, + {enables, kernel_ready}]}). + +-export([topic_matches/2]). + +-ifdef(use_specs). +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-endif. + +description() -> + [{name, <<"topic">>}, + {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. + +publish(#exchange{name = Name}, Delivery = + #delivery{message = #basic_message{routing_key = RoutingKey}}) -> + rabbit_router:deliver(rabbit_router:match_bindings( + Name, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end), + Delivery). + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) + when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or + last_topic_match(P, [BacktrackNext | R], BacktrackList). + +validate(_X) -> ok. +create(_X) -> ok. +recover(_X, _Bs) -> ok. +delete(_X, _Bs) -> ok. +add_binding(_X, _B) -> ok. +remove_bindings(_X, _Bs) -> ok. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183fc9..878af02976 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -47,12 +47,14 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -113,6 +117,17 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -120,29 +135,45 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + case maybe_notify(State, State#lim{blocked = false}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -164,14 +195,21 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> notify_queues(NewState); - false -> NewState + case (limit_reached(OldState) orelse is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse is_blocked(NewState)) of + true -> NewState1 = notify_queues(NewState), + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont + end, NewState1}; + false -> {cont, NewState} end. limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), @@ -209,3 +247,9 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 0654f58ae7..028b0d73ea 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -49,15 +49,17 @@ -export([ensure_ok/2]). -export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). --export([table_foreach/2]). +-export([table_fold/3]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). -export([unfold/2, ceil/1, queue_fold/3]). +-export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1, dict_cons/3, unlink_and_capture_exit/1]). -import(mnesia). -import(lists). @@ -96,8 +98,8 @@ -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> ok_or_error()). --spec(report_cover/1 :: (string()) -> 'ok'). +-spec(enable_cover/1 :: (file_path()) -> ok_or_error()). +-spec(report_cover/1 :: (file_path()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -114,23 +116,31 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). +-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). --spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). --spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). --spec(append_file/2 :: (string(), string()) -> ok_or_error()). +-spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()). +-spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (file_path(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). -spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). --spec(ceil/1 :: (number()) -> number()). +-spec(ceil/1 :: (number()) -> integer()). -spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B). +-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt'). +-spec(version_compare/3 :: (string(), string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()). +-spec(recursive_delete/1 :: ([file_path()]) -> + 'ok' | {'error', {file_path(), any()}}). +-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). +-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -endif. @@ -305,7 +315,7 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case mnesia:sync_transaction(TxFun) of + case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of {atomic, Result} -> Result; {aborted, Reason} -> throw({error, Reason}) end. @@ -357,20 +367,20 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% For each entry in a table, execute a function in a transaction. -%% This is often far more efficient than wrapping a tx around the lot. +%% Fold over each entry in a table, executing the cons function in a +%% transaction. This is often far more efficient than wrapping a tx +%% around the lot. %% %% We ignore entries that have been modified or removed. -table_foreach(F, TableName) -> - lists:foreach( - fun (E) -> execute_mnesia_transaction( +table_fold(F, Acc0, TableName) -> + lists:foldl( + fun (E, Acc) -> execute_mnesia_transaction( fun () -> case mnesia:match_object(TableName, E, read) of - [] -> ok; - _ -> F(E) + [] -> Acc; + _ -> F(E, Acc) end end) - end, dirty_read_all(TableName)), - ok. + end, Acc0, dirty_read_all(TableName)). dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -504,6 +514,10 @@ queue_fold(Fun, Init, Q) -> {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. +%% Sorts a list of AMQP table fields as per the AMQP spec +sort_field_table(Arguments) -> + lists:keysort(1, Arguments). + %% This provides a string representation of a pid that is the same %% regardless of what node we are running on. The representation also %% permits easy identification of the pid's node. @@ -595,3 +609,46 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Files) -> + lists:foldl(fun (Path, ok ) -> recursive_delete1(Path); + (_Path, {error, _Err} = Error) -> Error + end, ok, Files). + +recursive_delete1(Path) -> + case filelib:is_dir(Path) of + false -> case file:delete(Path) of + ok -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Path, Err}} + end; + true -> case file:list_dir(Path) of + {ok, FileNames} -> + case lists:foldl( + fun (FileName, ok) -> + recursive_delete1( + filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames) of + ok -> + case file:del_dir(Path) of + ok -> ok; + {error, Err} -> {error, {Path, Err}} + end; + {error, _Err} = Error -> + Error + end; + {error, Err} -> + {error, {Path, Err}} + end + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +unlink_and_capture_exit(Pid) -> + unlink(Pid), + receive {'EXIT', Pid, _} -> ok + after 0 -> ok + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 6ec3cf74b3..55a6761d2d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -48,7 +48,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). --spec(dir/0 :: () -> string()). +-spec(dir/0 :: () -> file_path()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). @@ -424,9 +424,8 @@ reset(Force) -> cannot_delete_schema) end, ok = delete_cluster_nodes_config(), - %% remove persistet messages and any other garbage we find - lists:foreach(fun file:delete/1, - filelib:wildcard(dir() ++ "/*")), + %% remove persisted messages and any other garbage we find + ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 8c898498e6..336f74bf9a 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -42,6 +42,7 @@ -spec(start/0 :: () -> no_return()). -spec(stop/0 :: () -> 'ok'). +-spec(usage/0 :: () -> no_return()). -endif. @@ -51,7 +52,7 @@ start() -> RpcTimeout = case init:get_argument(maxwait) of {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> 30000 + _ -> ?MAX_WAIT end, case init:get_plain_arguments() of [] -> @@ -86,16 +87,8 @@ stop() -> ok. usage() -> - io:format("Usage: rabbitmq-multi <command> - -Available commands: - - start_all <NodeCount> - start a local cluster of RabbitMQ nodes. - status - print status of all running nodes - stop_all - stops all local RabbitMQ nodes. - rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes. -"), - halt(3). + io:format("~s", [rabbit_multi_usage:usage()]), + halt(1). action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index cf04f05b7e..7978573d90 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -117,15 +117,25 @@ start() -> transient, infinity, supervisor, [tcp_client_sup]}), ok. +getaddr(Host) -> + %% inet_parse:address takes care of ip string, like "0.0.0.0" + %% inet:getaddr returns immediately for ip tuple {0,0,0,0}, + %% and runs 'inet_gethost' port process for dns lookups. + %% On Windows inet:getaddr runs dns resolver for ip string, which may fail. + case inet_parse:address(Host) of + {ok, IPAddress1} -> IPAddress1; + {error, _} -> + case inet:getaddr(Host, inet) of + {ok, IPAddress2} -> IPAddress2; + {error, Reason} -> + error_logger:error_msg("invalid host ~p - ~p~n", + [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}) + end + end. + check_tcp_listener_address(NamePrefix, Host, Port) -> - IPAddress = - case inet:getaddr(Host, inet) of - {ok, IPAddress1} -> IPAddress1; - {error, Reason} -> - error_logger:error_msg("invalid host ~p - ~p~n", - [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}) - end, + IPAddress = getaddr(Host), if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), @@ -157,7 +167,7 @@ start_listener(Host, Port, Label, OnConnect) -> ok. stop_tcp_listener(Host, Port) -> - {ok, IPAddress} = inet:getaddr(Host, inet), + IPAddress = getaddr(Host), Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name), diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 019d2a269d..8aa5ad8d32 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -70,11 +70,11 @@ -ifdef(use_specs). --type(qmsg() :: {amqqueue(), pkey()}). +-type(pmsg() :: {queue_name(), pkey()}). -type(work_item() :: - {publish, message(), qmsg()} | - {deliver, qmsg()} | - {ack, qmsg()}). + {publish, message(), pmsg()} | + {deliver, pmsg()} | + {ack, pmsg()}). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(transaction/1 :: ([work_item()]) -> 'ok'). @@ -406,7 +406,10 @@ check_version(_Other) -> requeue_messages(Snapshot = #psnapshot{messages = Messages, queues = Queues}) -> - Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + Work = ets:foldl( + fun ({{QName, PKey}, Delivered}, Acc) -> + rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc) + end, dict:new(), Queues), %% unstable parallel map, because order doesn't matter L = lists:append( rabbit_misc:upmap( @@ -425,13 +428,6 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages, %% contains the mutated messages and queues tables Snapshot. -accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> - Requeue = {PKey, Delivered}, - dict:update(QName, - fun (Requeues) -> [Requeue | Requeues] end, - [Requeue], - Acc). - requeue(QName, Requeues, Messages) -> case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{pid = QPid}} -> @@ -474,12 +470,8 @@ internal_integrate_messages(Items, Snapshot) -> internal_integrate1({extend_transaction, Key, MessageList}, Snapshot = #psnapshot {transactions = Transactions}) -> - NewTransactions = - dict:update(Key, - fun (MessageLists) -> [MessageList | MessageLists] end, - [MessageList], - Transactions), - Snapshot#psnapshot{transactions = NewTransactions}; + Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, + Transactions)}; internal_integrate1({rollback_transaction, Key}, Snapshot = #psnapshot{transactions = Transactions}) -> Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1a4830e11c..5cf519b795 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -39,6 +39,8 @@ -export([init/1, mainloop/3]). +-export([server_properties/0]). + -export([analyze_frame/2]). -import(gen_tcp). @@ -133,6 +135,7 @@ -spec(info/1 :: (pid()) -> [info()]). -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). +-spec(server_properties/0 :: () -> amqp_table()). -endif. @@ -198,6 +201,16 @@ teardown_profiling(Value) -> fprof:analyse([{dest, []}, {cols, 100}]) end. +server_properties() -> + {ok, Product} = application:get_key(rabbit, id), + {ok, Version} = application:get_key(rabbit, vsn), + [{list_to_binary(K), longstr, list_to_binary(V)} || + {K, V} <- [{"product", Product}, + {"version", Version}, + {"platform", "Erlang/OTP"}, + {"copyright", ?COPYRIGHT_MESSAGE}, + {"information", ?INFORMATION_MESSAGE}]]. + inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). socket_op(Sock, Fun) -> @@ -510,21 +523,12 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, case check_version({ProtocolMajor, ProtocolMinor}, {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of true -> - {ok, Product} = application:get_key(id), - {ok, Version} = application:get_key(vsn), ok = send_on_channel0( Sock, #'connection.start'{ version_major = ?PROTOCOL_VERSION_MAJOR, version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = - [{list_to_binary(K), longstr, list_to_binary(V)} || - {K, V} <- - [{"product", Product}, - {"version", Version}, - {"platform", "Erlang/OTP"}, - {"copyright", ?COPYRIGHT_MESSAGE}, - {"information", ?INFORMATION_MESSAGE}]], + server_properties = server_properties(), mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }), {State#v1{connection = Connection#connection{ diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl new file mode 100644 index 0000000000..06d59249bb --- /dev/null +++ b/src/rabbit_restartable_sup.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_restartable_sup). + +-behaviour(supervisor). + +-export([start_link/2]). + +-export([init/1]). + +-include("rabbit.hrl"). + +start_link(Name, {_M, _F, _A} = Fun) -> + supervisor:start_link({local, Name}, ?MODULE, [Fun]). + +init([{Mod, _F, _A} = Fun]) -> + {ok, {{one_for_one, 10, 10}, + [{Mod, Fun, transient, ?MAX_WAIT, worker, [Mod]}]}}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ee2b82c5bd..a449e19eb4 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -30,12 +30,15 @@ %% -module(rabbit_router). +-include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). -behaviour(gen_server2). -export([start_link/0, - deliver/2]). + deliver/2, + match_bindings/2, + match_routing_key/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -73,13 +76,9 @@ deliver(QPids, Delivery) -> %% which then in turn delivers it to its queues. deliver_per_node( dict:to_list( - lists:foldl( - fun (QPid, D) -> - dict:update(node(QPid), - fun (QPids1) -> [QPid | QPids1] end, - [QPid], D) - end, - dict:new(), QPids)), + lists:foldl(fun (QPid, D) -> + rabbit_misc:dict_cons(node(QPid), QPid, D) + end, dict:new(), QPids)), Delivery). deliver_per_node([{Node, QPids}], Delivery) when Node == node() -> @@ -129,6 +128,46 @@ deliver_per_node(NodeQPids, Delivery) -> -endif. +%% TODO: Maybe this should be handled by a cursor instead. +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(Name, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- + mnesia:dirty_match_object( + rabbit_route, + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + Match(Binding)] + end). + +match_routing_key(Name, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = Name, + queue_name = '$1', + key = RoutingKey, + _ = '_'}}, + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). + +lookup_qpids(Queues) -> + sets:fold( + fun(Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], sets:from_list(Queues)). + %%-------------------------------------------------------------------- init([]) -> diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl index a1b8948155..2c5e51125e 100644 --- a/src/rabbit_sup.erl +++ b/src/rabbit_sup.erl @@ -33,10 +33,13 @@ -behaviour(supervisor). --export([start_link/0, start_child/1, start_child/2]). +-export([start_link/0, start_child/1, start_child/2, start_child/3, + start_restartable_child/1, start_restartable_child/2]). -export([init/1]). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). start_link() -> @@ -46,10 +49,25 @@ start_child(Mod) -> start_child(Mod, []). start_child(Mod, Args) -> + start_child(Mod, Mod, Args). + +start_child(ChildId, Mod, Args) -> {ok, _} = supervisor:start_child(?SERVER, - {Mod, {Mod, start_link, Args}, - transient, 100, worker, [Mod]}), + {ChildId, {Mod, start_link, Args}, + transient, ?MAX_WAIT, worker, [Mod]}), + ok. + +start_restartable_child(Mod) -> + start_restartable_child(Mod, []). + +start_restartable_child(Mod, Args) -> + Name = list_to_atom(atom_to_list(Mod) ++ "_sup"), + {ok, _} = supervisor:start_child( + ?SERVER, + {Name, {rabbit_restartable_sup, start_link, + [Name, {Mod, start_link, Args}]}, + transient, infinity, supervisor, [rabbit_restartable_sup]}), ok. init([]) -> - {ok, {{one_for_one, 10, 10}, []}}. + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 51d95c35a3..82f2d19918 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -330,7 +330,8 @@ test_topic_match(P, R) -> test_topic_match(P, R, true). test_topic_match(P, R, Expected) -> - case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of + case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), + list_to_binary(R)) of Expected -> passed; _ -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 0fe1542616..493925efd5 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -63,4 +63,4 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, OnStartup, OnShutdown, Label]}, - transient, 100, worker, [tcp_listener]}]}}. + transient, 16#ffffffff, worker, [tcp_listener]}]}}. diff --git a/src/worker_pool.erl b/src/worker_pool.erl new file mode 100644 index 0000000000..97e075459f --- /dev/null +++ b/src/worker_pool.erl @@ -0,0 +1,155 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool). + +%% Generic worker pool manager. +%% +%% Supports nested submission of jobs (nested jobs always run +%% immediately in current worker process). +%% +%% Possible future enhancements: +%% +%% 1. Allow priorities (basically, change the pending queue to a +%% priority_queue). + +-behaviour(gen_server2). + +-export([start_link/0, submit/1, submit_async/1, idle/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: + (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +-record(state, { available, pending }). + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). + +submit(Fun) -> + case get(worker_pool_worker) of + true -> worker_pool_worker:run(Fun); + _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + worker_pool_worker:submit(Pid, Fun) + end. + +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + +idle(WId) -> + gen_server2:cast(?SERVER, {idle, WId}). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(next_free, From, State = #state { available = Avail, + pending = Pending }) -> + case queue:out(Avail) of + {empty, _Avail} -> + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }, + hibernate}; + {{value, WId}, Avail1} -> + {reply, get_worker_pid(WId), State #state { available = Avail1 }, + hibernate} + end; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({idle, WId}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = queue:in(WId, Avail) }; + {{value, {next_free, From}}, Pending1} -> + gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { pending = Pending1 } + end, hibernate}; + +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +get_worker_pid(WId) -> + [{WId, Pid, _Type, _Modules} | _] = + lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) + when Id =:= WId -> false; + (_) -> true + end, + supervisor:which_children(worker_pool_sup)), + Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl new file mode 100644 index 0000000000..4ded63a8db --- /dev/null +++ b/src/worker_pool_sup.erl @@ -0,0 +1,69 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_sup). + +-behaviour(supervisor). + +-export([start_link/0, start_link/1]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link() -> + start_link(erlang:system_info(schedulers)). + +start_link(WCount) -> + supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + +%%---------------------------------------------------------------------------- + +init([WCount]) -> + {ok, {{one_for_one, 10, 10}, + [{worker_pool, {worker_pool, start_link, []}, transient, + 16#ffffffff, worker, [worker_pool]} | + [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl new file mode 100644 index 0000000000..d3a4811926 --- /dev/null +++ b/src/worker_pool_worker.erl @@ -0,0 +1,104 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(worker_pool_worker). + +-behaviour(gen_server2). + +-export([start_link/1, submit/2, submit_async/2, run/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: + (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). + +%%---------------------------------------------------------------------------- + +start_link(WId) -> + gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). + +submit(Pid, Fun) -> + gen_server2:call(Pid, {submit, Fun}, infinity). + +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + +init([WId]) -> + ok = worker_pool:idle(WId), + put(worker_pool_worker, true), + {ok, WId, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({submit, Fun}, From, WId) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_call(Msg, _From, State) -> + {stop, {unexpected_call, Msg}, State}. + +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId, hibernate}; + +handle_cast(Msg, State) -> + {stop, {unexpected_cast, Msg}, State}. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, State) -> + State. + +%%---------------------------------------------------------------------------- + +run({M, F, A}) -> + apply(M, F, A); +run(Fun) -> + Fun(). |
