diff options
| author | Tony Garnock-Jones <tonyg@lshift.net> | 2008-07-03 13:35:11 +0100 |
|---|---|---|
| committer | Tony Garnock-Jones <tonyg@lshift.net> | 2008-07-03 13:35:11 +0100 |
| commit | 675869a27714307bce377638dfe8f6a5f069e757 (patch) | |
| tree | e4f9872242be02145702775f5c563f2b246f57ce /src | |
| download | rabbitmq-server-git-675869a27714307bce377638dfe8f6a5f069e757.tar.gz | |
Initial commit, from repo-rebase-20080703121916_default (e96543d904a2)
Diffstat (limited to 'src')
35 files changed, 8374 insertions, 0 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl new file mode 100644 index 0000000000..d250570198 --- /dev/null +++ b/src/buffering_proxy.erl @@ -0,0 +1,92 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(buffering_proxy). + +-export([start_link/2]). + +%% internal + +-export([mainloop/4, drain/2]). +-export([proxy_loop/3]). + +%%---------------------------------------------------------------------------- + +start_link(M, A) -> + spawn_link( + fun () -> process_flag(trap_exit, true), + ProxyPid = self(), + Ref = make_ref(), + Pid = spawn_link( + fun () -> mainloop(ProxyPid, Ref, M, + M:init(ProxyPid, A)) end), + proxy_loop(Ref, Pid, empty) + end). + +%%---------------------------------------------------------------------------- + +mainloop(ProxyPid, Ref, M, State) -> + ProxyPid ! Ref, + NewState = + receive + {Ref, Messages} -> + lists:foldl(fun (Msg, S) -> + drain(M, M:handle_message(Msg, S)) + end, State, lists:reverse(Messages)); + Msg -> M:handle_message(Msg, State) + end, + ?MODULE:mainloop(ProxyPid, Ref, M, NewState). + +drain(M, State) -> + receive + Msg -> ?MODULE:drain(M, M:handle_message(Msg, State)) + after 0 -> + State + end. + +proxy_loop(Ref, Pid, State) -> + receive + Ref -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> waiting; + waiting -> exit(duplicate_next); + Messages -> Pid ! {Ref, Messages}, empty + end); + {'EXIT', Pid, Reason} -> + exit(Reason); + {'EXIT', _, Reason} -> + exit(Pid, Reason), + ?MODULE:proxy_loop(Ref, Pid, State); + Msg -> + ?MODULE:proxy_loop( + Ref, Pid, + case State of + empty -> [Msg]; + waiting -> Pid ! {Ref, [Msg]}, empty; + Messages -> [Msg | Messages] + end) + end. diff --git a/src/rabbit.erl b/src/rabbit.erl new file mode 100644 index 0000000000..e65d532b2a --- /dev/null +++ b/src/rabbit.erl @@ -0,0 +1,282 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit). + +-behaviour(application). + +-export([start/0, stop/0, stop_and_halt/0, status/0]). + +-export([start/2, stop/1]). + +-import(application). +-import(mnesia). +-import(lists). +-import(inet). +-import(gen_tcp). + +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-define(APPS, [os_mon, mnesia, rabbit]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> 'ok'). +-spec(stop/0 :: () -> 'ok'). +-spec(stop_and_halt/0 :: () -> 'ok'). +-spec(status/0 :: () -> + [{running_applications, [{atom(), string(), string()}]} | + {nodes, [node()]} | + {running_nodes, [node()]}]). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + try + ok = ensure_working_log_config(), + ok = rabbit_mnesia:ensure_mnesia_dir(), + ok = start_applications(?APPS) + after + %%give the error loggers some time to catch up + timer:sleep(100) + end. + +stop() -> + ok = stop_applications(?APPS). + +stop_and_halt() -> + spawn(fun () -> + SleepTime = 1000, + rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n", + [SleepTime]), + timer:sleep(SleepTime), + init:stop() + end), + case catch stop() of _ -> ok end. + +status() -> + [{running_applications, application:which_applications()}] ++ + rabbit_mnesia:status(). + +%%-------------------------------------------------------------------- + +manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> + Iterate(fun (App, Acc) -> + case Do(App) of + ok -> [App | Acc]; + {error, {SkipError, _}} -> Acc; + {error, Reason} -> + lists:foreach(Undo, Acc), + throw({error, {ErrorTag, App, Reason}}) + end + end, [], Apps), + ok. + +start_applications(Apps) -> + manage_applications(fun lists:foldl/3, + fun application:start/1, + fun application:stop/1, + already_started, + cannot_start_application, + Apps). + +stop_applications(Apps) -> + manage_applications(fun lists:foldr/3, + fun application:stop/1, + fun application:start/1, + not_started, + cannot_stop_application, + Apps). + +start(normal, []) -> + + {ok, SupPid} = rabbit_sup:start_link(), + + print_banner(), + + {ok, ExtraSteps} = application:get_env(extra_startup_steps), + + lists:foreach( + fun ({Msg, Thunk}) -> + io:format("starting ~-20s ...", [Msg]), + Thunk(), + io:format("done~n"); + ({Msg, M, F, A}) -> + io:format("starting ~-20s ...", [Msg]), + apply(M, F, A), + io:format("done~n") + end, + [{"database", + fun () -> ok = rabbit_mnesia:init() end}, + {"core processes", + fun () -> + ok = start_child(rabbit_log), + + ok = rabbit_amqqueue:start(), + + ok = rabbit_binary_generator: + check_empty_content_body_frame_size(), + + ok = start_child(rabbit_router), + ok = start_child(rabbit_node_monitor) + end}, + {"recovery", + fun () -> + ok = maybe_insert_default_data(), + + ok = rabbit_exchange:recover(), + ok = rabbit_amqqueue:recover(), + ok = rabbit_realm:recover() + end}, + {"persister", + fun () -> + ok = start_child(rabbit_persister) + end}, + {"builtin applications", + fun () -> + {ok, DefaultVHost} = application:get_env(default_vhost), + ok = error_logger:add_report_handler( + rabbit_error_logger, [DefaultVHost]), + ok = start_builtin_amq_applications() + end}, + {"TCP listeners", + fun () -> + ok = rabbit_networking:start(), + {ok, TCPListeners} = application:get_env(tcp_listeners), + lists:foreach( + fun ({Host, Port}) -> + ok = rabbit_networking:start_tcp_listener(Host, Port) + end, + TCPListeners) + end}] + ++ ExtraSteps), + + io:format("~nbroker running~n"), + + {ok, SupPid}. + +stop(_State) -> + terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), + ok. + +%--------------------------------------------------------------------------- + +print_banner() -> + {ok, Product} = application:get_key(id), + {ok, Version} = application:get_key(vsn), + io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n", + [Product, Version, + ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), + io:format("Logging to ~p~nSASL logging to ~p~n~n", + [error_log_location(), sasl_log_location()]). + +start_child(Mod) -> + {ok,_} = supervisor:start_child(rabbit_sup, + {Mod, {Mod, start_link, []}, + transient, 100, worker, [Mod]}), + ok. + +maybe_insert_default_data() -> + case rabbit_mnesia:is_db_empty() of + true -> insert_default_data(); + false -> ok + end. + +insert_default_data() -> + {ok, DefaultUser} = application:get_env(default_user), + {ok, DefaultPass} = application:get_env(default_pass), + {ok, DefaultVHost} = application:get_env(default_vhost), + ok = rabbit_access_control:add_vhost(DefaultVHost), + ok = insert_default_user(DefaultUser, DefaultPass, + [{DefaultVHost, [<<"/data">>, <<"/admin">>]}]), + ok. + +insert_default_user(Username, Password, VHostSpecs) -> + ok = rabbit_access_control:add_user(Username, Password), + lists:foreach( + fun ({VHostPath, Realms}) -> + ok = rabbit_access_control:map_user_vhost( + Username, VHostPath), + lists:foreach( + fun (Realm) -> + RealmFullName = + rabbit_misc:r(VHostPath, realm, Realm), + ok = rabbit_access_control:map_user_realm( + Username, + rabbit_access_control:full_ticket( + RealmFullName)) + end, Realms) + end, VHostSpecs), + ok. + +start_builtin_amq_applications() -> + %%TODO: we may want to create a separate supervisor for these so + %%they don't bring down the entire app when they die and fail to + %%restart + ok. + +ensure_working_log_config() -> + case error_logger:logfile(filename) of + {error, no_log_file} -> + %% either no log file was configured or opening it failed. + case application:get_env(kernel, error_logger) of + {ok, {file, Filename}} -> + case filelib:ensure_dir(Filename) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_log_to_file, + Filename, Reason1}}) + end, + case error_logger:logfile({open, Filename}) of + ok -> ok; + {error, Reason2} -> + throw({error, {cannot_log_to_file, + Filename, Reason2}}) + end; + _ -> ok + end; + _Filename -> ok + end. + +error_log_location() -> + case error_logger:logfile(filename) of + {error,no_log_file} -> tty; + File -> File + end. + +sasl_log_location() -> + case application:get_env(sasl, sasl_error_logger) of + {ok, {file, File}} -> File; + {ok, false} -> undefined; + {ok, tty} -> tty; + {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); + _ -> undefined + end. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl new file mode 100644 index 0000000000..2be07b1996 --- /dev/null +++ b/src/rabbit_access_control.erl @@ -0,0 +1,366 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_access_control). +-include_lib("stdlib/include/qlc.hrl"). +-include("rabbit.hrl"). + +-export([check_login/2, user_pass_login/2, + check_vhost_access/2, lookup_realm_access/2]). +-export([add_user/2, delete_user/1, change_password/2, list_users/0, + lookup_user/1]). +-export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). +-export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). +-export([list_user_realms/2, map_user_realm/2, full_ticket/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(check_login/2 :: (binary(), binary()) -> user()). +-spec(user_pass_login/2 :: (username(), password()) -> user()). +-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). +-spec(lookup_realm_access/2 :: (user(), realm_name()) -> maybe(ticket())). +-spec(add_user/2 :: (username(), password()) -> 'ok'). +-spec(delete_user/1 :: (username()) -> 'ok'). +-spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(list_users/0 :: () -> [username()]). +-spec(lookup_user/1 :: (username()) -> {'ok', user()} | not_found()). +-spec(add_vhost/1 :: (vhost()) -> 'ok'). +-spec(delete_vhost/1 :: (vhost()) -> 'ok'). +-spec(list_vhosts/0 :: () -> [vhost()]). +-spec(list_vhost_users/1 :: (vhost()) -> [username()]). +-spec(list_user_vhosts/1 :: (username()) -> [vhost()]). +-spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(map_user_realm/2 :: (username(), ticket()) -> 'ok'). +-spec(list_user_realms/2 :: (username(), vhost()) -> [{name(), ticket()}]). +-spec(full_ticket/1 :: (realm_name()) -> ticket()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, +%% apparently, by OpenAMQ. +check_login(<<"PLAIN">>, Response) -> + [User, Pass] = [list_to_binary(T) || + T <- string:tokens(binary_to_list(Response), [0])], + user_pass_login(User, Pass); +%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually +%% defines this as PLAIN, but in 0-9 that definition is gone, instead +%% referring generically to "SASL security mechanism", i.e. the above. +check_login(<<"AMQPLAIN">>, Response) -> + LoginTable = rabbit_binary_parser:parse_table(Response), + case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), + lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of + {{value, {_, longstr, User}}, + {value, {_, longstr, Pass}}} -> + user_pass_login(User, Pass); + _ -> + %% Is this an information leak? + rabbit_misc:protocol_error( + access_refused, + "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field", + [LoginTable]) + end; + +check_login(Mechanism, _Response) -> + rabbit_misc:protocol_error( + access_refused, "unsupported authentication mechanism '~s'", + [Mechanism]). + +user_pass_login(User, Pass) -> + ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), + case lookup_user(User) of + {ok, U} -> + if + Pass == U#user.password -> U; + true -> + rabbit_misc:protocol_error( + access_refused, "login refused for user '~s'", [User]) + end; + {error, not_found} -> + rabbit_misc:protocol_error( + access_refused, "login refused for user '~s'", [User]) + end. + +internal_lookup_vhost_access(Username, VHostPath) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:match_object( + #user_vhost{username = Username, + virtual_host = VHostPath}) of + [] -> not_found; + [R] -> {ok, R} + end + end). + +check_vhost_access(#user{username = Username}, VHostPath) -> + ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), + case internal_lookup_vhost_access(Username, VHostPath) of + {ok, _R} -> + ok; + not_found -> + rabbit_misc:protocol_error( + access_refused, "access to vhost '~s' refused for user '~s'", + [VHostPath, Username]) + end. + +lookup_realm_access(#user{username = Username}, RealmName = #resource{kind = realm}) -> + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + case user_realms(Username, RealmName) of + [] -> + none; + [#user_realm{ticket_pattern = TicketPattern}] -> + TicketPattern + end + end). + +add_user(Username, Password) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({user, Username}) of + [] -> + ok = mnesia:write(#user{username = Username, + password = Password}); + _ -> + mnesia:abort({user_already_exists, Username}) + end + end), + rabbit_log:info("Created user ~p~n", [Username]), + R. + +delete_user(Username) -> + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:delete({user, Username}), + ok = mnesia:delete({user_vhost, Username}), + ok = mnesia:delete({user_realm, Username}) + end)), + rabbit_log:info("Deleted user ~p~n", [Username]), + R. + +change_password(Username, Password) -> + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> + ok = mnesia:write(#user{username = Username, + password = Password}) + end)), + rabbit_log:info("Changed password for user ~p~n", [Username]), + R. + +list_users() -> + mnesia:dirty_all_keys(user). + +lookup_user(Username) -> + rabbit_misc:dirty_read({user, Username}). + +add_vhost(VHostPath) -> + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({vhost, VHostPath}) of + [] -> + ok = mnesia:write(#vhost{virtual_host = VHostPath}), + DataRealm = + rabbit_misc:r(VHostPath, realm, <<"/data">>), + AdminRealm = + rabbit_misc:r(VHostPath, realm, <<"/admin">>), + ok = rabbit_realm:add_realm(DataRealm), + ok = rabbit_realm:add_realm(AdminRealm), + #exchange{} = rabbit_exchange:declare( + DataRealm, <<"">>, + direct, true, false, []), + #exchange{} = rabbit_exchange:declare( + DataRealm, <<"amq.direct">>, + direct, true, false, []), + #exchange{} = rabbit_exchange:declare( + DataRealm, <<"amq.topic">>, + topic, true, false, []), + #exchange{} = rabbit_exchange:declare( + DataRealm, <<"amq.fanout">>, + fanout, true, false, []), + ok; + [_] -> + mnesia:abort({vhost_already_exists, VHostPath}) + end + end), + rabbit_log:info("Added vhost ~p~n", [VHostPath]), + R. + +delete_vhost(VHostPath) -> + %%FIXME: We are forced to delete the queues outside the TX below + %%because queue deletion involves sending messages to the queue + %%process, which in turn results in further mnesia actions and + %%eventually the termination of that process. + lists:foreach(fun (Q) -> + {ok,_} = rabbit_amqqueue:delete(Q, false, false) + end, + rabbit_amqqueue:list_vhost_queues(VHostPath)), + R = rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + fun () -> + ok = internal_delete_vhost(VHostPath) + end)), + rabbit_log:info("Deleted vhost ~p~n", [VHostPath]), + R. + +internal_delete_vhost(VHostPath) -> + lists:foreach(fun (#exchange{name=Name}) -> + ok = rabbit_exchange:delete(Name, false) + end, + rabbit_exchange:list_vhost_exchanges(VHostPath)), + lists:foreach(fun (RealmName) -> + ok = rabbit_realm:delete_realm( + rabbit_misc:r(VHostPath, realm, RealmName)) + end, + rabbit_realm:list_vhost_realms(VHostPath)), + lists:foreach(fun (Username) -> + ok = unmap_user_vhost(Username, VHostPath) + end, + list_vhost_users(VHostPath)), + ok = mnesia:delete({vhost, VHostPath}), + ok. + +list_vhosts() -> + mnesia:dirty_all_keys(vhost). + +list_vhost_users(VHostPath) -> + [Username || + #user_vhost{username = Username} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + fun () -> mnesia:index_read(user_vhost, VHostPath, + #user_vhost.virtual_host) + end))]. + +list_user_vhosts(Username) -> + [VHostPath || + #user_vhost{virtual_host = VHostPath} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user( + Username, + fun () -> mnesia:read({user_vhost, Username}) end))]. + +map_user_vhost(Username, VHostPath) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + ok = mnesia:write( + #user_vhost{username = Username, + virtual_host = VHostPath}) + end)). + +unmap_user_vhost(Username, VHostPath) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + lists:foreach(fun mnesia:delete_object/1, + user_realms(Username, + rabbit_misc:r(VHostPath, realm))), + ok = mnesia:delete_object( + #user_vhost{username = Username, + virtual_host = VHostPath}) + end)). + +map_user_realm(Username, + Ticket = #ticket{realm_name = RealmName = + #resource{virtual_host = VHostPath, + kind = realm}}) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + rabbit_misc:with_realm( + RealmName, + fun () -> + lists:foreach(fun mnesia:delete_object/1, + user_realms(Username, RealmName)), + case internal_lookup_vhost_access(Username, VHostPath) of + {ok, _R} -> + case ticket_liveness(Ticket) of + alive -> + ok = mnesia:write( + #user_realm{username = Username, + realm = RealmName, + ticket_pattern = Ticket}); + dead -> + ok + end; + not_found -> + mnesia:abort(not_mapped_to_vhost) + end + end))). + +list_user_realms(Username, VHostPath) -> + [{Name, Pattern} || + #user_realm{realm = #resource{name = Name}, + ticket_pattern = Pattern} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_user_and_vhost( + Username, VHostPath, + fun () -> + case internal_lookup_vhost_access( + Username, VHostPath) of + {ok, _R} -> + user_realms(Username, + rabbit_misc:r(VHostPath, realm)); + not_found -> + mnesia:abort(not_mapped_to_vhost) + end + end))]. + +ticket_liveness(#ticket{passive_flag = false, + active_flag = false, + write_flag = false, + read_flag = false}) -> + dead; +ticket_liveness(_) -> + alive. + +full_ticket(RealmName) -> + #ticket{realm_name = RealmName, + passive_flag = true, + active_flag = true, + write_flag = true, + read_flag = true}. + +user_realms(Username, RealmName) -> + mnesia:match_object(#user_realm{username = Username, + realm = RealmName, + _ = '_'}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl new file mode 100644 index 0000000000..63f043ba1c --- /dev/null +++ b/src/rabbit_amqqueue.erl @@ -0,0 +1,379 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_amqqueue). + +-export([start/0, recover/0, declare/5, delete/3, purge/1, internal_delete/1]). +-export([pseudo_queue/3]). +-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, + commit/2, rollback/2]). +-export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). +-export([claim_queue/2]). +-export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([notify_sent/2, notify_down/2]). +-export([on_node_down/1]). + +-import(mnesia). +-import(gen_server). +-import(lists). +-import(queue). + +-include("rabbit.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(qstats() :: {'ok', queue_name(), non_neg_integer(), non_neg_integer()}). +-type(qlen() :: {'ok', non_neg_integer()}). +-type(qfun(A) :: fun ((amqqueue()) -> A)). +-type(bind_res() :: {'ok', non_neg_integer()} | + {'error', 'queue_not_found' | 'exchange_not_found'}). +-spec(start/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). +-spec(declare/5 :: (realm_name(), name(), bool(), bool(), amqp_table()) -> + amqqueue()). +-spec(add_binding/4 :: + (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/4 :: + (queue_name(), exchange_name(), routing_key(), amqp_table()) -> + bind_res() | {'error', 'binding_not_found'}). +-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). +-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). +-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(list_vhost_queues/1 :: (vhost()) -> [amqqueue()]). +-spec(stat/1 :: (amqqueue()) -> qstats()). +-spec(stat_all/0 :: () -> [qstats()]). +-spec(delete/3 :: + (amqqueue(), 'false', 'false') -> qlen(); + (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'}; + (amqqueue(), 'false', 'true' ) -> qlen() | {'error', 'not_empty'}; + (amqqueue(), 'true' , 'true' ) -> qlen() | + {'error', 'in_use'} | + {'error', 'not_empty'}). +-spec(purge/1 :: (amqqueue()) -> qlen()). +-spec(deliver/5 :: (bool(), bool(), maybe(txn()), message(), pid()) -> bool()). +-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). +-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). +-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). +-spec(commit/2 :: (pid(), txn()) -> 'ok'). +-spec(rollback/2 :: (pid(), txn()) -> 'ok'). +-spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). +-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). +-spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> + {'ok', non_neg_integer(), msg()} | 'empty'). +-spec(basic_consume/7 :: + (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> + 'ok' | {'error', 'queue_owned_by_another_connection' | + 'exclusive_consume_unavailable'}). +-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). +-spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). +-spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(pseudo_queue/3 :: (realm_name(), binary(), pid()) -> amqqueue()). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + {ok,_} = supervisor:start_child( + rabbit_sup, + {rabbit_amqqueue_sup, + {rabbit_amqqueue_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup]}), + ok. + +recover() -> + ok = recover_durable_queues(), + ok. + +recover_durable_queues() -> + Node = node(), + %% TODO: use dirty ops instead + R = rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(durable_queues), + node(Pid) == Node])) + end), + Queues = lists:map(fun start_queue_process/1, R), + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun recover_queue/1, Queues), + ok + end). + +declare(RealmName, NameBin, Durable, AutoDelete, Args) -> + QName = rabbit_misc:r(RealmName, queue, NameBin), + Q = start_queue_process(#amqqueue{name = QName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + binding_specs = [], + pid = none}), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({amqqueue, QName}) of + [] -> ok = recover_queue(Q), + ok = rabbit_realm:add(RealmName, QName), + Q; + [ExistingQ] -> ExistingQ + end + end) of + Q -> Q; + ExistingQ -> exit(Q#amqqueue.pid, shutdown), + ExistingQ + end. + +store_queue(Q = #amqqueue{durable = true}) -> + ok = mnesia:write(durable_queues, Q, write), + ok = mnesia:write(Q), + ok; +store_queue(Q = #amqqueue{durable = false}) -> + ok = mnesia:write(Q), + ok. + +start_queue_process(Q) -> + {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), + Q#amqqueue{pid = Pid}. + +recover_queue(Q) -> + ok = store_queue(Q), + ok = recover_bindings(Q), + ok. + +default_binding_spec(#resource{virtual_host = VHost, name = Name}) -> + #binding_spec{exchange_name = rabbit_misc:r(VHost, exchange, <<>>), + routing_key = Name, + arguments = []}. + +recover_bindings(Q = #amqqueue{name = QueueName, binding_specs = Specs}) -> + ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), + lists:foreach(fun (B) -> + ok = rabbit_exchange:add_binding(B, Q) + end, Specs), + ok. + +modify_bindings(QueueName, ExchangeName, RoutingKey, Arguments, + SpecPresentFun, SpecAbsentFun) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({amqqueue, QueueName}) of + [Q = #amqqueue{binding_specs = Specs0}] -> + Spec = #binding_spec{exchange_name = ExchangeName, + routing_key = RoutingKey, + arguments = Arguments}, + case (case lists:member(Spec, Specs0) of + true -> SpecPresentFun; + false -> SpecAbsentFun + end)(Q, Spec) of + {ok, #amqqueue{binding_specs = Specs}} -> + {ok, length(Specs)}; + {error, not_found} -> + {error, exchange_not_found}; + Other -> Other + end; + [] -> {error, queue_not_found} + end + end). + +update_bindings(Q = #amqqueue{binding_specs = Specs0}, Spec, + UpdateSpecFun, UpdateExchangeFun) -> + Q1 = Q#amqqueue{binding_specs = UpdateSpecFun(Spec, Specs0)}, + case UpdateExchangeFun(Spec, Q1) of + ok -> store_queue(Q1), + {ok, Q1}; + Other -> Other + end. + +add_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> + modify_bindings( + QueueName, ExchangeName, RoutingKey, Arguments, + fun (Q, _Spec) -> {ok, Q} end, + fun (Q, Spec) -> update_bindings( + Q, Spec, + fun (S, Specs) -> [S | Specs] end, + fun rabbit_exchange:add_binding/2) + end). + +delete_binding(QueueName, ExchangeName, RoutingKey, Arguments) -> + modify_bindings( + QueueName, ExchangeName, RoutingKey, Arguments, + fun (Q, Spec) -> update_bindings( + Q, Spec, + fun lists:delete/2, + fun rabbit_exchange:delete_binding/2) + end, + fun (Q, Spec) -> + %% the following is essentially a no-op, though crucially + %% it produces {error, not_found} when the exchange does + %% not exist. + case rabbit_exchange:delete_binding(Spec, Q) of + ok -> {error, binding_not_found}; + Other -> Other + end + end). + +lookup(Name) -> + rabbit_misc:dirty_read({amqqueue, Name}). + +with(Name, F, E) -> + case lookup(Name) of + {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); + {error, not_found} -> E() + end. + +with(Name, F) -> + with(Name, F, fun () -> {error, not_found} end). +with_or_die(Name, F) -> + with(Name, F, fun () -> rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(Name)]) + end). + +list_vhost_queues(VHostPath) -> + mnesia:dirty_match_object( + #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). + +stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). + +stat_all() -> + lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + +delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> + gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + +purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). + +deliver(_IsMandatory, true, Txn, Message, QPid) -> + gen_server:call(QPid, {deliver_immediately, Txn, Message}); +deliver(true, _IsImmediate, Txn, Message, QPid) -> + gen_server:call(QPid, {deliver, Txn, Message}), + true; +deliver(false, _IsImmediate, Txn, Message, QPid) -> + gen_server:cast(QPid, {deliver, Txn, Message}), + true. + +redeliver(QPid, Messages) -> + gen_server:cast(QPid, {redeliver, Messages}). + +requeue(QPid, MsgIds, ChPid) -> + gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + +ack(QPid, Txn, MsgIds, ChPid) -> + gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + +commit(QPid, Txn) -> + gen_server:call(QPid, {commit, Txn}). + +rollback(QPid, Txn) -> + gen_server:cast(QPid, {rollback, Txn}). + +notify_down(#amqqueue{ pid = QPid }, ChPid) -> + gen_server:call(QPid, {notify_down, ChPid}). + +binding_forcibly_removed(BindingSpec, QueueName) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({amqqueue, QueueName}) of + [] -> ok; + [Q = #amqqueue{binding_specs = Specs}] -> + store_queue(Q#amqqueue{binding_specs = + lists:delete(BindingSpec, Specs)}) + end + end). + +claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> + gen_server:call(QPid, {claim_queue, ReaderPid}). + +basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> + gen_server:call(QPid, {basic_get, ChPid, NoAck}). + +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, + ConsumerTag, ExclusiveConsume, OkMsg) -> + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + ConsumerTag, ExclusiveConsume, OkMsg}). + +basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> + ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + +notify_sent(QPid, ChPid) -> + gen_server:cast(QPid, {notify_sent, ChPid}). + +delete_bindings(Q = #amqqueue{binding_specs = Specs}) -> + lists:foreach(fun (BindingSpec) -> + ok = rabbit_exchange:delete_binding( + BindingSpec, Q) + end, Specs). + +internal_delete(QueueName) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({amqqueue, QueueName}) of + [] -> {error, not_found}; + [Q] -> + ok = delete_temp(Q), + ok = mnesia:delete({durable_queues, QueueName}), + ok = rabbit_realm:delete_from_all(QueueName), + ok + end + end). + +delete_temp(Q = #amqqueue{name = QueueName}) -> + ok = delete_bindings(Q), + ok = rabbit_exchange:delete_binding( + default_binding_spec(QueueName), Q), + ok = mnesia:delete({amqqueue, QueueName}), + ok. + +delete_queue(Q = #amqqueue{name = QueueName, durable = Durable}) -> + ok = delete_temp(Q), + if + Durable -> ok; + true -> ok = rabbit_realm:delete_from_all(QueueName) + end. + +on_node_down(Node) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:fold( + fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + ok, + qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(amqqueue), + node(Pid) == Node])) + end). + +pseudo_queue(RealmName, NameBin, Pid) -> + #amqqueue{name = rabbit_misc:r(RealmName, queue, NameBin), + durable = false, + auto_delete = false, + arguments = [], + binding_specs = [], + pid = Pid}. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl new file mode 100644 index 0000000000..7716ef1646 --- /dev/null +++ b/src/rabbit_amqqueue_process.erl @@ -0,0 +1,686 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_amqqueue_process). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-behaviour(gen_server). + +-define(UNSENT_MESSAGE_LIMIT, 100). + +-export([start_link/1]). + +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). + +-import(queue). +-import(erlang). +-import(lists). + +% Queue's state +-record(q, {q, + owner, + exclusive_consumer, + has_had_consumers, + next_msg_id, + message_buffer, + round_robin}). + +-record(consumer, {tag, ack_required}). + +-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). + +%% These are held in our process dictionary +-record(cr, {consumers, + ch_pid, + monitor_ref, + unacked_messages, + is_overload_protection_active, + unsent_message_count}). + +%%---------------------------------------------------------------------------- + +start_link(Q) -> + gen_server:start_link(?MODULE, Q, []). + +%%---------------------------------------------------------------------------- + +init(Q) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + {ok, #q{q = Q, + owner = none, + exclusive_consumer = none, + has_had_consumers = false, + next_msg_id = 1, + message_buffer = queue:new(), + round_robin = queue:new()}}. + +terminate(_Reason, State) -> + %% FIXME: How do we cancel active subscriptions? + QName = qname(State), + lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, + all_tx()), + ok = purge_message_buffer(QName, State#q.message_buffer), + ok = rabbit_amqqueue:internal_delete(QName). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +lookup_ch(ChPid) -> + case get({ch, ChPid}) of + undefined -> not_found; + C -> C + end. + +ch_record(ChPid) -> + Key = {ch, ChPid}, + case get(Key) of + undefined -> + MonitorRef = erlang:monitor(process, ChPid), + C = #cr{consumers = [], + ch_pid = ChPid, + monitor_ref = MonitorRef, + unacked_messages = dict:new(), + is_overload_protection_active = false, + unsent_message_count = 0}, + put(Key, C), + C; + C = #cr{} -> C + end. + +store_ch_record(C = #cr{ch_pid = ChPid}) -> + put({ch, ChPid}, C). + +all_ch_record() -> + [C || {{ch, _}, C} <- get()]. + +update_store_and_maybe_block_ch( + C = #cr{is_overload_protection_active = Active, + unsent_message_count = Count}) -> + {Result, NewActive} = + if + not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> + {block_ch, true}; + Active and (Count == 0) -> + {unblock_ch, false}; + true -> + {ok, Active} + end, + store_ch_record(C#cr{is_overload_protection_active = NewActive}), + Result. + +deliver_immediately(Message, Delivered, + State = #q{q = #amqqueue{name = QName}, + round_robin = RoundRobin, + next_msg_id = NextId}) -> + ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), + case queue:out(RoundRobin) of + {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}}, + RoundRobinTail} -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewConsumers = + case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}; + {empty, _} -> + not_offered + end. + +attempt_delivery(none, Message, State) -> + case deliver_immediately(Message, false, State) of + {offered, false, State1} -> + {true, State1}; + {offered, true, State1} -> + persist_message(none, qname(State), Message), + persist_delivery(qname(State), Message, false), + {true, State1}; + not_offered -> + {false, State} + end; +attempt_delivery(Txn, Message, State) -> + persist_message(Txn, qname(State), Message), + record_pending_message(Txn, Message), + {true, State}. + +deliver_or_enqueue(Txn, Message, State) -> + case attempt_delivery(Txn, Message, State) of + {true, NewState} -> + {true, NewState}; + {false, NewState} -> + persist_message(Txn, qname(State), Message), + NewMB = queue:in({Message, false}, NewState#q.message_buffer), + {false, NewState#q{message_buffer = NewMB}} + end. + +deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> + run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), + State). + +block_consumers(ChPid, RoundRobin) -> + %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), + queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(RoundRobin))). + +unblock_consumers(ChPid, Consumers, RoundRobin) -> + %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]), + queue:join(RoundRobin, + queue:from_list([{ChPid, Con} || Con <- Consumers])). + +block_consumer(ChPid, ConsumerTag, RoundRobin) -> + %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]), + queue:from_list(lists:filter( + fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= ConsumerTag) + end, queue:to_list(RoundRobin))). + +possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, + State = #q{round_robin = RoundRobin}) -> + case update_store_and_maybe_block_ch(C) of + ok -> + State; + unblock_ch -> + run_poke_burst(State#q{round_robin = + unblock_consumers(ChPid, Consumers, RoundRobin)}) + end. + +check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> + {continue, State}; +check_auto_delete(State = #q{has_had_consumers = false}) -> + {continue, State}; +check_auto_delete(State = #q{round_robin = RoundRobin}) -> + % The clauses above rule out cases where no-one has consumed from + % this queue yet, and cases where we are not an auto_delete queue + % in any case. Thus it remains to check whether we have any active + % listeners at this point. + case queue:is_empty(RoundRobin) of + true -> + % There are no waiting listeners. It's possible that we're + % completely unused. Check. + case is_unused() of + true -> + % There are no active consumers at this + % point. This is the signal to autodelete. + {stop, State}; + false -> + % There is at least one active consumer, so we + % shouldn't delete ourselves. + {continue, State} + end; + false -> + % There are some waiting listeners, thus we are not + % unused, so can continue life as normal without needing + % to check the process dictionary. + {continue, State} + end. + +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + round_robin = ActiveConsumers}) -> + case lookup_ch(DownPid) of + not_found -> {noreply, State}; + #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> + NewActive = block_consumers(ChPid, ActiveConsumers), + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + case check_auto_delete( + deliver_or_enqueue_n( + [{Message, true} || + {_Messsage_id, Message} <- dict:to_list(UAM)], + State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + round_robin = NewActive})) of + {continue, NewState} -> + {noreply, NewState}; + {stop, NewState} -> + {stop, normal, NewState} + end + end. + +cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> + none; +cancel_holder(_ChPid, _ConsumerTag, Holder) -> + Holder. + +check_queue_owner(none, _) -> ok; +check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; +check_queue_owner({_, _}, _) -> mismatch. + +check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) -> + in_use; +check_exclusive_access(none, false) -> + ok; +check_exclusive_access(none, true) -> + case is_unused() of + true -> ok; + false -> in_use + end. + +run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> + run_poke_burst(MessageBuffer, State). + +run_poke_burst(MessageBuffer, State) -> + case queue:out(MessageBuffer) of + {{value, {Message, Delivered}}, BufferTail} -> + case deliver_immediately(Message, Delivered, State) of + {offered, true, NewState} -> + persist_delivery(qname(State), Message, Delivered), + run_poke_burst(BufferTail, NewState); + {offered, false, NewState} -> + persist_auto_ack(qname(State), Message), + run_poke_burst(BufferTail, NewState); + not_offered -> + State#q{message_buffer = MessageBuffer} + end; + {empty, _} -> + State#q{message_buffer = MessageBuffer} + end. + +is_unused() -> + is_unused1(get()). + +is_unused1([]) -> + true; +is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest]) + when Consumers /= [] -> + false; +is_unused1([_ | Rest]) -> + is_unused1(Rest). + +maybe_send_reply(_ChPid, undefined) -> ok; +maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). + +qname(#q{q = #amqqueue{name = QName}}) -> QName. + +persist_message(_Txn, _QName, #basic_message{persistent_key = none}) -> + ok; +persist_message(Txn, QName, Message) -> + M = Message#basic_message{ + %% don't persist any recoverable decoded properties, rebuild from properties_bin on restore + content = rabbit_binary_parser:clear_decoded_content( + Message#basic_message.content)}, + persist_work(Txn, QName, + [{publish, M, {QName, M#basic_message.persistent_key}}]). + +persist_delivery(_QName, _Message, + true) -> + ok; +persist_delivery(_QName, #basic_message{persistent_key = none}, + _Delivered) -> + ok; +persist_delivery(QName, #basic_message{persistent_key = PKey}, + _Delivered) -> + persist_work(none, QName, [{deliver, {QName, PKey}}]). + +persist_acks(Txn, QName, Messages) -> + persist_work(Txn, QName, + [{ack, {QName, PKey}} || + #basic_message{persistent_key = PKey} <- Messages, + PKey =/= none]). + +persist_auto_ack(_QName, #basic_message{persistent_key = none}) -> + ok; +persist_auto_ack(QName, #basic_message{persistent_key = PKey}) -> + %% auto-acks are always non-transactional + rabbit_persister:dirty_work([{ack, {QName, PKey}}]). + +persist_work(_Txn,_QName, []) -> + ok; +persist_work(none, _QName, WorkList) -> + rabbit_persister:dirty_work(WorkList); +persist_work(Txn, QName, WorkList) -> + mark_tx_persistent(Txn), + rabbit_persister:extend_transaction({Txn, QName}, WorkList). + +commit_work(Txn, QName) -> + do_if_persistent(fun rabbit_persister:commit_transaction/1, + Txn, QName). + +rollback_work(Txn, QName) -> + do_if_persistent(fun rabbit_persister:rollback_transaction/1, + Txn, QName). + +%% optimisation: don't do unnecessary work +%% it would be nice if this was handled by the persister +do_if_persistent(F, Txn, QName) -> + case is_tx_persistent(Txn) of + false -> ok; + true -> ok = F({Txn, QName}) + end. + +lookup_tx(Txn) -> + case get({txn, Txn}) of + undefined -> #tx{ch_pid = none, + is_persistent = false, + pending_messages = [], + pending_acks = []}; + V -> V + end. + +store_tx(Txn, Tx) -> + put({txn, Txn}, Tx). + +erase_tx(Txn) -> + erase({txn, Txn}). + +all_tx() -> + [Txn || {{txn, Txn}, _} <- get()]. + +mark_tx_persistent(Txn) -> + Tx = lookup_tx(Txn), + store_tx(Txn, Tx#tx{is_persistent = true}). + +is_tx_persistent(Txn) -> + #tx{is_persistent = Res} = lookup_tx(Txn), + Res. + +record_pending_message(Txn, Message) -> + Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), + store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). + +record_pending_acks(Txn, ChPid, MsgIds) -> + Tx = #tx{pending_acks = Pending} = lookup_tx(Txn), + store_tx(Txn, Tx#tx{pending_acks = [MsgIds | Pending], ch_pid = ChPid}). + +process_pending(Txn, State) -> + #tx{ch_pid = ChPid, + pending_messages = PendingMessages, + pending_acks = PendingAcks} = lookup_tx(Txn), + case lookup_ch(ChPid) of + not_found -> ok; + C = #cr{unacked_messages = UAM} -> + {_Acked, Remaining} = + collect_messages(lists:append(PendingAcks), UAM), + store_ch_record(C#cr{unacked_messages = Remaining}) + end, + deliver_or_enqueue_n(lists:reverse(PendingMessages), State). + +collect_messages(MsgIds, UAM) -> + lists:mapfoldl( + fun (MsgId, D) -> {dict:fetch(MsgId, D), dict:erase(MsgId, D)} end, + UAM, MsgIds). + +purge_message_buffer(QName, MessageBuffer) -> + Messages = + [[Message || {Message, _Delivered} <- + queue:to_list(MessageBuffer)] | + lists:map( + fun (#cr{unacked_messages = UAM}) -> + [Message || {_MessageId, Message} <- dict:to_list(UAM)] + end, + all_ch_record())], + %% the simplest, though certainly not the most obvious or + %% efficient, way to purge messages from the persister is to + %% artifically ack them. + persist_acks(none, QName, lists:append(Messages)). + +%--------------------------------------------------------------------------- + +handle_call({deliver_immediately, Txn, Message}, _From, State) -> + %% Synchronous, "immediate" delivery mode + %% + %% FIXME: Is this correct semantics? + %% + %% I'm worried in particular about the case where an exchange has + %% two queues against a particular routing key, and a message is + %% sent in immediate mode through the binding. In non-immediate + %% mode, both queues get the message, saving it for later if + %% there's noone ready to receive it just now. In immediate mode, + %% should both queues still get the message, somehow, or should + %% just all ready-to-consume queues get the message, with unready + %% queues discarding the message? + %% + {Delivered, NewState} = attempt_delivery(Txn, Message, State), + {reply, Delivered, NewState}; + +handle_call({deliver, Txn, Message}, _From, State) -> + %% Synchronous, "mandatory" delivery mode + {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {reply, Delivered, NewState}; + +handle_call({commit, Txn}, From, State) -> + ok = commit_work(Txn, qname(State)), + %% optimisation: we reply straight away so the sender can continue + gen_server:reply(From, ok), + NewState = process_pending(Txn, State), + erase_tx(Txn), + {noreply, NewState}; + +handle_call({notify_down, ChPid}, From, State) -> + %% optimisation: we reply straight away so the sender can continue + gen_server:reply(From, ok), + handle_ch_down(ChPid, State); + +handle_call({basic_get, ChPid, NoAck}, _From, + State = #q{q = #amqqueue{name = QName}, + next_msg_id = NextId, + message_buffer = MessageBuffer}) -> + case queue:out(MessageBuffer) of + {{value, {Message, Delivered}}, BufferTail} -> + AckRequired = not(NoAck), + case AckRequired of + true -> + persist_delivery(QName, Message, Delivered), + C = #cr{unacked_messages = UAM} = ch_record(ChPid), + NewUAM = dict:store(NextId, Message, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}); + false -> + persist_auto_ack(QName, Message) + end, + Msg = {QName, self(), NextId, Delivered, Message}, + {reply, {ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + {empty, _} -> + {reply, empty, State} + end; + +handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, + ExclusiveConsume, OkMsg}, + _From, State = #q{owner = Owner, + exclusive_consumer = ExistingHolder, + round_robin = RoundRobin}) -> + case check_queue_owner(Owner, ReaderPid) of + mismatch -> + {reply, {error, queue_owned_by_another_connection}, State}; + ok -> + case check_exclusive_access(ExistingHolder, ExclusiveConsume) of + in_use -> + {reply, {error, exclusive_consume_unavailable}, State}; + ok -> + C = #cr{consumers = Consumers} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, + C1 = C#cr{consumers = [Consumer | Consumers]}, + store_ch_record(C1), + State1 = State#q{has_had_consumers = true, + exclusive_consumer = + if + ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, + round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, + ok = maybe_send_reply(ChPid, OkMsg), + {reply, ok, run_poke_burst(State1)} + end + end; + +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, + State = #q{exclusive_consumer = Holder, + round_robin = RoundRobin}) -> + case lookup_ch(ChPid) of + not_found -> + ok = maybe_send_reply(ChPid, OkMsg), + {reply, ok, State}; + C = #cr{consumers = Consumers} -> + NewConsumers = lists:filter + (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, + Consumers), + C1 = C#cr{consumers = NewConsumers}, + store_ch_record(C1), + ok = maybe_send_reply(ChPid, OkMsg), + case check_auto_delete( + State#q{exclusive_consumer = cancel_holder(ChPid, + ConsumerTag, + Holder), + round_robin = block_consumer(ChPid, + ConsumerTag, + RoundRobin)}) of + {continue, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {stop, normal, ok, State1} + end + end; + +handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, + message_buffer = MessageBuffer, + round_robin = RoundRobin}) -> + {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + +handle_call({delete, IfUnused, IfEmpty}, _From, + State = #q{message_buffer = MessageBuffer}) -> + IsEmpty = queue:is_empty(MessageBuffer), + IsUnused = is_unused(), + if + IfEmpty and not(IsEmpty) -> + {reply, {error, not_empty}, State}; + IfUnused and not(IsUnused) -> + {reply, {error, in_use}, State}; + true -> + {stop, normal, {ok, queue:len(MessageBuffer)}, State} + end; + +handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> + ok = purge_message_buffer(qname(State), MessageBuffer), + {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + +handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, + exclusive_consumer = Holder}) -> + case Owner of + none -> + case check_exclusive_access(Holder, true) of + in_use -> + %% FIXME: Is this really the right answer? What if + %% an active consumer's reader is actually the + %% claiming pid? Should that be allowed? In order + %% to check, we'd need to hold not just the ch + %% pid for each consumer, but also its reader + %% pid... + {reply, locked, State}; + ok -> + {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + end; + {ReaderPid, _MonitorRef} -> + {reply, ok, State}; + _ -> + {reply, locked, State} + end. + +handle_cast({deliver, Txn, Message}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), + {noreply, NewState}; + +handle_cast({ack, Txn, MsgIds, ChPid}, State) -> + case lookup_ch(ChPid) of + not_found -> + {noreply, State}; + C = #cr{unacked_messages = UAM} -> + {Acked, Remaining} = collect_messages(MsgIds, UAM), + persist_acks(Txn, qname(State), Acked), + case Txn of + none -> + store_ch_record(C#cr{unacked_messages = Remaining}); + _ -> + record_pending_acks(Txn, ChPid, MsgIds) + end, + {noreply, State} + end; + +handle_cast({rollback, Txn}, State) -> + ok = rollback_work(Txn, qname(State)), + erase_tx(Txn), + {noreply, State}; + +handle_cast({redeliver, Messages}, State) -> + {noreply, deliver_or_enqueue_n(Messages, State)}; + +handle_cast({requeue, MsgIds, ChPid}, State) -> + case lookup_ch(ChPid) of + not_found -> + rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", + [ChPid]), + {noreply, State}; + C = #cr{unacked_messages = UAM} -> + {Messages, NewUAM} = collect_messages(MsgIds, UAM), + store_ch_record(C#cr{unacked_messages = NewUAM}), + {noreply, deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)} + end; + +handle_cast({notify_sent, ChPid}, State) -> + case lookup_ch(ChPid) of + not_found -> {noreply, State}; + T = #cr{unsent_message_count =Count} -> + {noreply, possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)} + end. + +handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, + State = #q{owner = {DownPid, MonitorRef}}) -> + %% We know here that there are no consumers on this queue that are + %% owned by other pids than the one that just went down, so since + %% exclusive in some sense implies autodelete, we delete the queue + %% here. The other way of implementing the "exclusive implies + %% autodelete" feature is to actually set autodelete when an + %% exclusive declaration is seen, but this has the problem that + %% the python tests rely on the queue not going away after a + %% basic.cancel when the queue was declared exclusive and + %% nonautodelete. + NewState = State#q{owner = none}, + {stop, normal, NewState}; +handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> + handle_ch_down(DownPid, State); + +handle_info(Info, State) -> + ?LOGDEBUG("Info in queue: ~p~n", [Info]), + {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl new file mode 100644 index 0000000000..c68e2b3eda --- /dev/null +++ b/src/rabbit_amqqueue_sup.erl @@ -0,0 +1,42 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_amqqueue_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + {ok, {{simple_one_for_one, 10, 10}, + [{rabbit_amqqueue, {rabbit_amqqueue_process, start_link, []}, + temporary, brutal_kill, worker, [rabbit_amqqueue_process]}]}}. diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl new file mode 100644 index 0000000000..8b16abec70 --- /dev/null +++ b/src/rabbit_binary_generator.erl @@ -0,0 +1,271 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_binary_generator). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +% EMPTY_CONTENT_BODY_FRAME_SIZE, 8 = 1 + 2 + 4 + 1 +% - 1 byte of frame type +% - 2 bytes of channel number +% - 4 bytes of frame payload length +% - 1 byte of payload trailer FRAME_END byte +% See definition of check_empty_content_body_frame_size/0, an assertion called at startup. +-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). + +-export([build_simple_method_frame/2, + build_simple_content_frames/3, + build_heartbeat_frame/0]). +-export([generate_table/1, encode_properties/2]). +-export([check_empty_content_body_frame_size/0]). + +-import(lists). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(frame() :: [binary()]). + +-spec(build_simple_method_frame/2 :: + (channel_number(), amqp_method()) -> frame()). +-spec(build_simple_content_frames/3 :: + (channel_number(), content(), non_neg_integer()) -> [frame()]). +-spec(build_heartbeat_frame/0 :: () -> frame()). +-spec(generate_table/1 :: (amqp_table()) -> binary()). +-spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()). +-spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +build_simple_method_frame(ChannelInt, MethodRecord) -> + MethodFields = rabbit_framing:encode_method_fields(MethodRecord), + MethodName = rabbit_misc:method_record_type(MethodRecord), + {ClassId, MethodId} = rabbit_framing:method_id(MethodName), + create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). + +build_simple_content_frames(ChannelInt, + #content{class_id = ClassId, + properties = ContentProperties, + properties_bin = ContentPropertiesBin, + payload_fragments_rev = PayloadFragmentsRev}, + FrameMax) -> + {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), + HeaderFrame = create_frame(2, ChannelInt, + [<<ClassId:16, 0:16, BodySize:64>>, + maybe_encode_properties(ContentProperties, ContentPropertiesBin)]), + [HeaderFrame | ContentFrames]. + +maybe_encode_properties(_ContentProperties, ContentPropertiesBin) + when is_binary(ContentPropertiesBin) -> + ContentPropertiesBin; +maybe_encode_properties(ContentProperties, none) -> + rabbit_framing:encode_properties(ContentProperties). + +build_content_frames(FragmentsRev, FrameMax, ChannelInt) -> + BodyPayloadMax = if + FrameMax == 0 -> + none; + true -> + FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE + end, + build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt). + +build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) -> + {SizeAcc, FragmentAcc}; +build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], + BodyPayloadMax, ChannelInt) + when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) -> + <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment, + build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev], + BodyPayloadMax, ChannelInt); +build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev], + BodyPayloadMax, ChannelInt) -> + build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt); +build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev], + BodyPayloadMax, ChannelInt) -> + build_content_frames(SizeAcc + size(Fragment), + [create_frame(3, ChannelInt, Fragment) | FragmentAcc], + FragmentsRev, + BodyPayloadMax, + ChannelInt). + +build_heartbeat_frame() -> + create_frame(?FRAME_HEARTBEAT, 0, <<>>). + +create_frame(TypeInt, ChannelInt, PayloadBin) when is_binary(PayloadBin) -> + [<<TypeInt:8, ChannelInt:16, (size(PayloadBin)):32>>, PayloadBin, <<?FRAME_END>>]; +create_frame(TypeInt, ChannelInt, Payload) -> + create_frame(TypeInt, ChannelInt, list_to_binary(Payload)). + +%% table_field_to_binary supports the AMQP 0-8/0-9 standard types, S, +%% I, D, T and F, as well as the QPid extensions b, d, f, l, s, t, x, +%% and V. + +table_field_to_binary({FName, longstr, Value}) -> + [short_string_to_binary(FName), "S", long_string_to_binary(Value)]; + +table_field_to_binary({FName, signedint, Value}) -> + [short_string_to_binary(FName), "I", <<Value:32/signed>>]; + +table_field_to_binary({FName, decimal, {Before, After}}) -> + [short_string_to_binary(FName), "D", Before, <<After:32>>]; + +table_field_to_binary({FName, timestamp, Value}) -> + [short_string_to_binary(FName), "T", <<Value:64>>]; + +table_field_to_binary({FName, table, Value}) -> + [short_string_to_binary(FName), "F", table_to_binary(Value)]; + +table_field_to_binary({FName, byte, Value}) -> + [short_string_to_binary(FName), "b", <<Value:8/unsigned>>]; + +table_field_to_binary({FName, double, Value}) -> + [short_string_to_binary(FName), "d", <<Value:64/float>>]; + +table_field_to_binary({FName, float, Value}) -> + [short_string_to_binary(FName), "f", <<Value:32/float>>]; + +table_field_to_binary({FName, long, Value}) -> + [short_string_to_binary(FName), "l", <<Value:64/signed>>]; + +table_field_to_binary({FName, short, Value}) -> + [short_string_to_binary(FName), "s", <<Value:16/signed>>]; + +table_field_to_binary({FName, bool, Value}) -> + [short_string_to_binary(FName), "t", if Value -> 1; true -> 0 end]; + +table_field_to_binary({FName, binary, Value}) -> + [short_string_to_binary(FName), "x", long_string_to_binary(Value)]; + +table_field_to_binary({FName, void, _Value}) -> + [short_string_to_binary(FName), "V"]. + +table_to_binary(Table) when is_list(Table) -> + BinTable = generate_table(Table), + [<<(size(BinTable)):32>>, BinTable]. + +generate_table(Table) when is_list(Table) -> + list_to_binary(lists:map(fun table_field_to_binary/1, Table)). + + +short_string_to_binary(String) when is_binary(String) and (size(String) < 256) -> + [<<(size(String)):8>>, String]; +short_string_to_binary(String) -> + StringLength = length(String), + true = (StringLength < 256), % assertion + [<<StringLength:8>>, String]. + + +long_string_to_binary(String) when is_binary(String) -> + [<<(size(String)):32>>, String]; +long_string_to_binary(String) -> + [<<(length(String)):32>>, String]. + + +encode_properties([], []) -> + <<0, 0>>; +encode_properties(TypeList, ValueList) -> + encode_properties(0, TypeList, ValueList, 0, [], []). + +encode_properties(_Bit, [], [], FirstShortAcc, FlagsAcc, PropsAcc) -> + list_to_binary([lists:reverse(FlagsAcc), <<FirstShortAcc:16>>, lists:reverse(PropsAcc)]); +encode_properties(_Bit, [], _ValueList, _FirstShortAcc, _FlagsAcc, _PropsAcc) -> + exit(content_properties_values_overflow); +encode_properties(15, TypeList, ValueList, FirstShortAcc, FlagsAcc, PropsAcc) -> + NewFlagsShort = FirstShortAcc bor 1, % set the continuation low bit + encode_properties(0, TypeList, ValueList, 0, [<<NewFlagsShort:16>> | FlagsAcc], PropsAcc); +encode_properties(Bit, [bit | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) -> + case Value of + true -> encode_properties(Bit + 1, TypeList, ValueList, + FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc, PropsAcc); + false -> encode_properties(Bit + 1, TypeList, ValueList, + FirstShortAcc, FlagsAcc, PropsAcc); + Other -> exit({content_properties_illegal_bit_value, Other}) + end; +encode_properties(Bit, [T | TypeList], [Value | ValueList], FirstShortAcc, FlagsAcc, PropsAcc) -> + case Value of + undefined -> encode_properties(Bit + 1, TypeList, ValueList, + FirstShortAcc, FlagsAcc, PropsAcc); + _ -> encode_properties(Bit + 1, TypeList, ValueList, + FirstShortAcc bor (1 bsl (15 - Bit)), + FlagsAcc, + [encode_property(T, Value) | PropsAcc]) + end. + +encode_property(shortstr, String) -> + Len = size(String), <<Len:8/unsigned, String:Len/binary>>; +encode_property(longstr, String) -> + Len = size(String), <<Len:32/unsigned, String:Len/binary>>; +encode_property(octet, Int) -> + <<Int:8/unsigned>>; +encode_property(shortint, Int) -> + <<Int:16/unsigned>>; +encode_property(longint, Int) -> + <<Int:32/unsigned>>; +encode_property(longlongint, Int) -> + <<Int:64/unsigned>>; +encode_property(timestamp, Int) -> + <<Int:64/unsigned>>; +encode_property(table, Table) -> + encode_table(Table). + + +encode_table(Table) -> + TableBin = list_to_binary(lists:map(fun encode_table_entry/1, Table)), + Len = size(TableBin), + <<Len:32/unsigned, TableBin:Len/binary>>. + + +encode_table_entry({Name, longstr, Value}) -> + NLen = size(Name), + VLen = size(Value), + <<NLen:8/unsigned, Name:NLen/binary, "S", VLen:32/unsigned, Value:VLen/binary>>; +encode_table_entry({Name, signedint, Value}) -> + NLen = size(Name), + <<NLen:8/unsigned, Name:NLen/binary, "I", Value:32/signed>>; +encode_table_entry({Name, decimal, {Before, After}}) -> + NLen = size(Name), + <<NLen:8/unsigned, Name:NLen/binary, "D", Before:8/unsigned, After:32/unsigned>>; +encode_table_entry({Name, timestamp, Value}) -> + NLen = size(Name), + <<NLen:8/unsigned, Name:NLen/binary, "T", Value:64/unsigned>>; +encode_table_entry({Name, table, Value}) -> + NLen = size(Name), + TableBin = encode_table(Value), + <<NLen:8/unsigned, Name:NLen/binary, "F", TableBin/binary>>. + +check_empty_content_body_frame_size() -> + %% Intended to ensure that EMPTY_CONTENT_BODY_FRAME_SIZE is + %% defined correctly. + ComputedSize = size(list_to_binary(create_frame(?FRAME_BODY, 0, <<>>))), + if ComputedSize == ?EMPTY_CONTENT_BODY_FRAME_SIZE -> + ok; + true -> + exit({incorrect_empty_content_body_frame_size, + ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) + end. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl new file mode 100644 index 0000000000..e942521504 --- /dev/null +++ b/src/rabbit_binary_parser.erl @@ -0,0 +1,161 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_binary_parser). + +-include("rabbit.hrl"). + +-export([parse_table/1, parse_properties/2]). +-export([ensure_content_decoded/1, clear_decoded_content/1]). + +-import(lists). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(parse_table/1 :: (binary()) -> amqp_table()). +-spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]). +-spec(ensure_content_decoded/1 :: (content()) -> decoded_content()). +-spec(clear_decoded_content/1 :: (content()) -> undecoded_content()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% parse_table supports the AMQP 0-8/0-9 standard types, S, I, D, T +%% and F, as well as the QPid extensions b, d, f, l, s, t, x, and V. + +parse_table(<<>>) -> + []; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "S", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> + [{NameString, longstr, ValueString} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "I", Value:32/signed, Rest/binary>>) -> + [{NameString, signedint, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "D", Before:8/unsigned, After:32/unsigned, Rest/binary>>) -> + [{NameString, decimal, {Before, After}} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "T", Value:64/unsigned, Rest/binary>>) -> + [{NameString, timestamp, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "F", VLen:32/unsigned, Table:VLen/binary, Rest/binary>>) -> + [{NameString, table, parse_table(Table)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "b", Value:8/unsigned, Rest/binary>>) -> + [{NameString, byte, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "d", Value:64/float, Rest/binary>>) -> + [{NameString, double, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "f", Value:32/float, Rest/binary>>) -> + [{NameString, float, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "l", Value:64/signed, Rest/binary>>) -> + [{NameString, long, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "s", Value:16/signed, Rest/binary>>) -> + [{NameString, short, Value} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "t", Value:8/unsigned, Rest/binary>>) -> + [{NameString, bool, (Value /= 0)} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "x", VLen:32/unsigned, ValueString:VLen/binary, Rest/binary>>) -> + [{NameString, binary, ValueString} | parse_table(Rest)]; + +parse_table(<<NLen:8/unsigned, NameString:NLen/binary, "V", Rest/binary>>) -> + [{NameString, void, undefined} | parse_table(Rest)]. + + +parse_properties([], _PropBin) -> + []; +parse_properties(TypeList, PropBin) -> + FlagCount = length(TypeList), + %% round up to the nearest multiple of 15 bits, since the 16th bit + %% in each short is a "continuation" bit. + FlagsLengthBytes = trunc((FlagCount + 14) / 15) * 2, + <<Flags:FlagsLengthBytes/binary, Properties/binary>> = PropBin, + <<FirstShort:16, Remainder/binary>> = Flags, + parse_properties(0, TypeList, [], FirstShort, Remainder, Properties). + +parse_properties(_Bit, [], Acc, _FirstShort, + _Remainder, <<>>) -> + lists:reverse(Acc); +parse_properties(_Bit, [], _Acc, _FirstShort, + _Remainder, _LeftoverBin) -> + exit(content_properties_binary_overflow); +parse_properties(15, TypeList, Acc, _OldFirstShort, + <<NewFirstShort:16,Remainder/binary>>, Properties) -> + parse_properties(0, TypeList, Acc, NewFirstShort, Remainder, Properties); +parse_properties(Bit, [Type | TypeListRest], Acc, FirstShort, + Remainder, Properties) -> + {Value, Rest} = + if (FirstShort band (1 bsl (15 - Bit))) /= 0 -> + parse_property(Type, Properties); + Type == bit -> {false , Properties}; + true -> {undefined, Properties} + end, + parse_properties(Bit + 1, TypeListRest, [Value | Acc], FirstShort, + Remainder, Rest). + +parse_property(shortstr, <<Len:8/unsigned, String:Len/binary, Rest/binary>>) -> + {String, Rest}; +parse_property(longstr, <<Len:32/unsigned, String:Len/binary, Rest/binary>>) -> + {String, Rest}; +parse_property(octet, <<Int:8/unsigned, Rest/binary>>) -> + {Int, Rest}; +parse_property(shortint, <<Int:16/unsigned, Rest/binary>>) -> + {Int, Rest}; +parse_property(longint, <<Int:32/unsigned, Rest/binary>>) -> + {Int, Rest}; +parse_property(longlongint, <<Int:64/unsigned, Rest/binary>>) -> + {Int, Rest}; +parse_property(timestamp, <<Int:64/unsigned, Rest/binary>>) -> + {Int, Rest}; +parse_property(bit, Rest) -> + {true, Rest}; +parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> + {parse_table(Table), Rest}. + + +ensure_content_decoded(Content = #content{properties = Props}) + when Props =/= 'none' -> + Content; +ensure_content_decoded(Content = #content{properties_bin = PropBin}) + when is_binary(PropBin) -> + Content#content{properties = rabbit_framing:decode_properties( + Content#content.class_id, PropBin)}. + +clear_decoded_content(Content = #content{properties = none}) -> + Content; +clear_decoded_content(Content = #content{properties_bin = none}) -> + %% Only clear when we can rebuild the properties later in + %% accordance to the content record definition comment - maximum + %% one of properties and properties_bin can be 'none' + Content; +clear_decoded_content(Content = #content{}) -> + Content#content{properties = none}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl new file mode 100644 index 0000000000..ec1d1fbaa4 --- /dev/null +++ b/src/rabbit_channel.erl @@ -0,0 +1,894 @@ +%% The contents of this file are subject to the Mozilla Public Licenses +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_channel). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([start_link/4, do/2, do/3, shutdown/1]). +-export([send_command/2, deliver/4]). + +%% callbacks +-export([init/2, handle_message/2]). + +-record(ch, {state, proxy_pid, reader_pid, writer_pid, + transaction_id, tx_participants, next_tag, + uncommitted_ack_q, unacked_message_q, + username, virtual_host, + most_recently_declared_queue, consumer_mapping, next_ticket}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(do/2 :: (pid(), amqp_method()) -> 'ok'). +-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-spec(shutdown/1 :: (pid()) -> 'ok'). +-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(ReaderPid, WriterPid, Username, VHost) -> + buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, + Username, VHost]). + +do(Pid, Method) -> + do(Pid, Method, none). + +do(Pid, Method, Content) -> + Pid ! {method, Method, Content}, + ok. + +shutdown(Pid) -> + Pid ! terminate, + ok. + +send_command(Pid, Msg) -> + Pid ! {command, Msg}, + ok. + +deliver(Pid, ConsumerTag, AckRequired, Msg) -> + Pid ! {deliver, ConsumerTag, AckRequired, Msg}, + ok. + +%%--------------------------------------------------------------------------- + +init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> + process_flag(trap_exit, true), + link(WriterPid), + #ch{state = starting, + proxy_pid = ProxyPid, + reader_pid = ReaderPid, + writer_pid = WriterPid, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new(), + next_ticket = 101}. + +handle_message({method, Method, Content}, State) -> + case (catch handle_method(Method, Content, State)) of + {reply, Reply, NewState} -> + ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), + NewState; + {noreply, NewState} -> + NewState; + stop -> + exit(normal); + {'EXIT', {amqp, Error, Explanation, none}} -> + terminate({amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State); + {'EXIT', Reason} -> + terminate(Reason, State) + end; + +handle_message(terminate, State) -> + terminate(normal, State); + +handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command(WriterPid, Msg), + State; + +handle_message({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{proxy_pid = ProxyPid, + writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), + ok = internal_deliver(WriterPid, ProxyPid, + true, ConsumerTag, DeliveryTag, Msg), + State1#ch{next_tag = DeliveryTag + 1}; + +handle_message({'EXIT', _Pid, Reason}, State) -> + terminate(Reason, State); + +handle_message(Other, State) -> + terminate({unexpected_channel_message, Other}, State). + +%%--------------------------------------------------------------------------- + +terminate(Reason, State = #ch{writer_pid = WriterPid}) -> + Res = notify_queues(internal_rollback(State)), + ok = rabbit_realm:leave_realms(self()), + case Reason of + normal -> ok = Res; + _ -> ok + end, + rabbit_writer:shutdown(WriterPid), + exit(Reason). + +return_ok(State, true, _Msg) -> {noreply, State}; +return_ok(State, false, Msg) -> {reply, Msg, State}. + +ok_msg(true, _Msg) -> undefined; +ok_msg(false, Msg) -> Msg. + +return_queue_declare_ok(State, NoWait, Q) -> + NewState = State#ch{most_recently_declared_queue = + (Q#amqqueue.name)#resource.name}, + case NoWait of + true -> {noreply, NewState}; + false -> + {ok, ActualName, MessageCount, ConsumerCount} = + rabbit_misc:with_exit_handler( + fun () -> {ok, Q#amqqueue.name, 0, 0} end, + fun () -> rabbit_amqqueue:stat(Q) end), + Reply = #'queue.declare_ok'{queue = ActualName#resource.name, + message_count = MessageCount, + consumer_count = ConsumerCount}, + {reply, Reply, NewState} + end. + +expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> + rabbit_misc:protocol_error( + not_allowed, "no previously declared queue", []); +expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, + most_recently_declared_queue = MRDQ }) -> + rabbit_misc:r(VHostPath, queue, MRDQ); +expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> + rabbit_misc:r(VHostPath, queue, QueueNameBin). + +expand_routing_key_shortcut(<<>>, <<>>, + #ch{ most_recently_declared_queue = <<>> }) -> + rabbit_misc:protocol_error( + not_allowed, "no previously declared queue", []); +expand_routing_key_shortcut(<<>>, <<>>, + #ch{ most_recently_declared_queue = MRDQ }) -> + MRDQ; +expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> + RoutingKey. + +die_precondition_failed(Fmt, Params) -> + %% FIXME: 406 should be replaced with precondition_failed when we + %% move to AMQP spec >=8.1 + rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>}, + Fmt, Params). + +check_ticket(TicketNumber, FieldIndex, Name, #ch{ username = Username}) -> + rabbit_ticket:check_ticket(TicketNumber, FieldIndex, Name, Username). + +lookup_ticket(TicketNumber, FieldIndex, + #ch{ username = Username, virtual_host = VHostPath }) -> + rabbit_ticket:lookup_ticket(TicketNumber, FieldIndex, + Username, VHostPath). + +%% check that an exchange/queue name does not contain the reserved +%% "amq." prefix. +%% +%% One, quite reasonable, interpretation of the spec, taken by the +%% QPid M1 Java client, is that the exclusion of "amq." prefixed names +%% only applies on actual creation, and not in the cases where the +%% entity already exists. This is how we use this function in the code +%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly +%% 0-9SP1, making it illegal to attempt to declare an exchange/queue +%% with an amq.* name when passive=false. So this will need +%% revisiting. +%% +%% TODO: enforce other constraints on name. See AMQP JIRA 69. +check_name(Kind, NameBin = <<"amq.", _/binary>>) -> + rabbit_misc:protocol_error( + access_refused, + "~s name '~s' contains reserved prefix 'amq.*'",[Kind, NameBin]); +check_name(_Kind, NameBin) -> + NameBin. + +handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> + {reply, #'channel.open_ok'{}, State#ch{state = running}}; + +handle_method(#'channel.open'{}, _, _State) -> + rabbit_misc:protocol_error( + command_invalid, "second 'channel.open' seen", []); + +handle_method(_Method, _, #ch{state = starting}) -> + rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); + +handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> + ok = notify_queues(internal_rollback(State)), + ok = rabbit_realm:leave_realms(self()), + ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), + ok = rabbit_writer:shutdown(WriterPid), + stop; + +handle_method(#'access.request'{realm = RealmNameBin, + exclusive = Exclusive, + passive = Passive, + active = Active, + write = Write, + read = Read}, + _, State = #ch{username = Username, + virtual_host = VHostPath, + next_ticket = NextTicket}) -> + RealmName = rabbit_misc:r(VHostPath, realm, RealmNameBin), + Ticket = #ticket{realm_name = RealmName, + passive_flag = Passive, + active_flag = Active, + write_flag = Write, + read_flag = Read}, + case rabbit_realm:access_request(Username, Exclusive, Ticket) of + ok -> + rabbit_ticket:record_ticket(NextTicket, Ticket), + NewState = State#ch{next_ticket = NextTicket + 1}, + {reply, #'access.request_ok'{ticket = NextTicket}, NewState}; + {error, not_found} -> + rabbit_misc:protocol_error( + invalid_path, "no ~s", [rabbit_misc:rs(RealmName)]); + {error, bad_realm_path} -> + %% FIXME: spec bug? access_refused is a soft error, spec requires it to be hard + rabbit_misc:protocol_error( + access_refused, "bad path for ~s", [rabbit_misc:rs(RealmName)]); + {error, resource_locked} -> + rabbit_misc:protocol_error( + resource_locked, "~s is locked", [rabbit_misc:rs(RealmName)]); + {error, access_refused} -> + rabbit_misc:protocol_error( + access_refused, + "~w permissions denied for user '~s' attempting to access ~s", + [rabbit_misc:permission_list(Ticket), + Username, rabbit_misc:rs(RealmName)]) + end; + +handle_method(#'basic.publish'{ticket = TicketNumber, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + mandatory = Mandatory, + immediate = Immediate}, + Content, State = #ch{ virtual_host = VHostPath}) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_ticket(TicketNumber, #ticket.write_flag, ExchangeName, State), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + %% We decode the content's properties here because we're almost + %% certain to want to look at delivery-mode and priority. + DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + PersistentKey = case is_message_persistent(DecodedContent) of + true -> rabbit_misc:guid(); + false -> none + end, + {noreply, publish(Mandatory, Immediate, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + persistent_key = PersistentKey}, + rabbit_exchange:route(Exchange, RoutingKey), State)}; + +handle_method(#'basic.ack'{delivery_tag = DeliveryTag, + multiple = Multiple}, + _, State = #ch{transaction_id = TxnKey, + next_tag = NextDeliveryTag, + unacked_message_q = UAMQ}) -> + if DeliveryTag >= NextDeliveryTag -> + rabbit_misc:protocol_error( + command_invalid, "unknown delivery tag ~w", [DeliveryTag]); + true -> ok + end, + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + {noreply, case TxnKey of + none -> State#ch{unacked_message_q = Remaining}; + _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, + Acked), + add_tx_participants( + Participants, + State#ch{unacked_message_q = Remaining, + uncommitted_ack_q = NewUAQ}) + end}; + +handle_method(#'basic.get'{ticket = TicketNumber, + queue = QueueNameBin, + no_ack = NoAck}, + _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + next_tag = DeliveryTag }) -> + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), + case rabbit_amqqueue:with_or_die( + QueueName, + fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + {ok, MessageCount, + Msg = {_QName, _QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}} -> + State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, + message_count = MessageCount}, + Content), + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + empty -> + {reply, #'basic.get_empty'{cluster_id = <<>>}, State} + end; + +handle_method(#'basic.consume'{ticket = TicketNumber, + queue = QueueNameBin, + consumer_tag = ConsumerTag, + no_local = _, % FIXME: implement + no_ack = NoAck, + exclusive = ExclusiveConsume, + nowait = NoWait}, + _, State = #ch{ proxy_pid = ProxyPid, + reader_pid = ReaderPid, + consumer_mapping = ConsumerMapping }) -> + case dict:find(ConsumerTag, ConsumerMapping) of + error -> + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), + ActualConsumerTag = + case ConsumerTag of + <<>> -> rabbit_misc:binstring_guid("amq.ctag"); + Other -> Other + end, + + %% In order to ensure that the consume_ok gets sent before + %% any messages are sent to the consumer, we get the queue + %% process to send the consume_ok on our behalf. + case rabbit_amqqueue:with_or_die( + QueueName, + fun (Q) -> + rabbit_amqqueue:basic_consume( + Q, NoAck, ReaderPid, ProxyPid, + ActualConsumerTag, ExclusiveConsume, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})) + end) of + ok -> + {noreply, State#ch{consumer_mapping = + dict:store(ActualConsumerTag, + QueueName, + ConsumerMapping)}}; + {error, queue_owned_by_another_connection} -> + %% The spec is silent on which exception to use + %% here. This seems reasonable? + %% FIXME: check this + + rabbit_misc:protocol_error( + resource_locked, "~s owned by another connection", + [rabbit_misc:rs(QueueName)]); + {error, exclusive_consume_unavailable} -> + rabbit_misc:protocol_error( + access_refused, "~s in exclusive use", + [rabbit_misc:rs(QueueName)]) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag]) + end; + +handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, + nowait = NoWait}, + _, State = #ch{ proxy_pid = ProxyPid, + consumer_mapping = ConsumerMapping }) -> + OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, + case dict:find(ConsumerTag, ConsumerMapping) of + error -> + %% Spec requires we ignore this situation. + return_ok(State, NoWait, OkMsg); + {ok, QueueName} -> + NewState = State#ch{consumer_mapping = + dict:erase(ConsumerTag, + ConsumerMapping)}, + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + %% In order to ensure that no more messages + %% are sent to the consumer after the + %% cancel_ok has been sent, we get the + %% queue process to send the cancel_ok on + %% our behalf. If we were sending the + %% cancel_ok ourselves it might overtake a + %% message sent previously by the queue. + rabbit_amqqueue:basic_cancel( + Q, ProxyPid, ConsumerTag, + ok_msg(NoWait, #'basic.cancel_ok'{ + consumer_tag = ConsumerTag})) + end) of + ok -> + {noreply, NewState}; + {error, not_found} -> + %% Spec requires we ignore this situation. + return_ok(NewState, NoWait, OkMsg) + end + end; + +handle_method(#'basic.qos'{}, _, State) -> + %% FIXME: Need to implement QOS + {reply, #'basic.qos_ok'{}, State}; + +handle_method(#'basic.recover'{requeue = true}, + _, State = #ch{ transaction_id = none, + proxy_pid = ProxyPid, + unacked_message_q = UAMQ }) -> + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + %% The Qpid python test suite incorrectly assumes + %% that messages will be requeued in their original + %% order. To keep it happy we reverse the id list + %% since we are given them in reverse order. + rabbit_amqqueue:requeue( + QPid, lists:reverse(MsgIds), ProxyPid) + end, ok, UAMQ), + %% No answer required, apparently! + {noreply, State#ch{unacked_message_q = queue:new()}}; + +handle_method(#'basic.recover'{requeue = false}, + _, State = #ch{ transaction_id = none, + proxy_pid = ProxyPid, + writer_pid = WriterPid, + unacked_message_q = UAMQ }) -> + lists:foreach( + fun ({_DeliveryTag, none, _Msg}) -> + %% Was sent as a basic.get_ok. Don't redeliver + %% it. FIXME: appropriate? + ok; + ({DeliveryTag, ConsumerTag, + {QName, QPid, MsgId, _Redelivered, Message}}) -> + %% Was sent as a proper consumer delivery. Resend it as + %% before. + %% + %% FIXME: What should happen if the consumer's been + %% cancelled since? + %% + %% FIXME: should we allocate a fresh DeliveryTag? + ok = internal_deliver( + WriterPid, ProxyPid, + false, ConsumerTag, DeliveryTag, + {QName, QPid, MsgId, true, Message}) + end, queue:to_list(UAMQ)), + %% No answer required, apparently! + {noreply, State}; + +handle_method(#'basic.recover'{}, _, _State) -> + rabbit_misc:protocol_error( + not_allowed, "attempt to recover a transactional channel",[]); + +handle_method(#'exchange.declare'{ticket = TicketNumber, + exchange = ExchangeNameBin, + type = TypeNameBin, + passive = false, + durable = Durable, + auto_delete = AutoDelete, + internal = false, + nowait = NoWait, + arguments = Args}, + _, State = #ch{ virtual_host = VHostPath }) -> + #ticket{realm_name = RealmName} = + lookup_ticket(TicketNumber, #ticket.active_flag, State), + CheckedType = rabbit_exchange:check_type(TypeNameBin), + %% FIXME: clarify spec as per declare wrt differing realms + X = case rabbit_exchange:lookup( + rabbit_misc:r(VHostPath, exchange, ExchangeNameBin)) of + {ok, FoundX} -> FoundX; + {error, not_found} -> + ActualNameBin = check_name('exchange', ExchangeNameBin), + rabbit_exchange:declare(RealmName, + ActualNameBin, + CheckedType, + Durable, + AutoDelete, + Args) + end, + ok = rabbit_exchange:assert_type(X, CheckedType), + return_ok(State, NoWait, #'exchange.declare_ok'{}); + +handle_method(#'exchange.declare'{ticket = TicketNumber, + exchange = ExchangeNameBin, + type = TypeNameBin, + passive = true, + nowait = NoWait}, + _, State = #ch{ virtual_host = VHostPath }) -> + %% FIXME: spec issue: permit active_flag here as well as passive_flag? + #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + X = rabbit_exchange:lookup_or_die(ExchangeName), + ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), + return_ok(State, NoWait, #'exchange.declare_ok'{}); + +handle_method(#'exchange.delete'{ticket = TicketNumber, + exchange = ExchangeNameBin, + if_unused = IfUnused, + nowait = NoWait}, + _, State = #ch { virtual_host = VHostPath }) -> + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_ticket(TicketNumber, #ticket.active_flag, ExchangeName, State), + case rabbit_exchange:delete(ExchangeName, IfUnused) of + {error, not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, in_use} -> + die_precondition_failed( + "~s in use", [rabbit_misc:rs(ExchangeName)]); + ok -> + return_ok(State, NoWait, #'exchange.delete_ok'{}) + end; + +handle_method(#'queue.declare'{ticket = TicketNumber, + queue = QueueNameBin, + passive = false, + durable = Durable, + exclusive = ExclusiveDeclare, + auto_delete = AutoDelete, + nowait = NoWait, + arguments = Args}, + _, State = #ch { virtual_host = VHostPath, + reader_pid = ReaderPid }) -> + #ticket{realm_name = RealmName} = + lookup_ticket(TicketNumber, #ticket.active_flag, State), + %% FIXME: atomic create&claim + Finish = + fun (Q) -> + if ExclusiveDeclare -> + case rabbit_amqqueue:claim_queue(Q, ReaderPid) of + locked -> + %% AMQP 0-8 doesn't say which + %% exception to use, so we mimic QPid + %% here. + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(Q#amqqueue.name)]); + ok -> ok + end; + true -> + ok + end, + Q + end, + %% FIXME: clarify spec as per declare wrt differing realms + Q = case rabbit_amqqueue:with( + rabbit_misc:r(VHostPath, queue, QueueNameBin), + Finish) of + {error, not_found} -> + ActualNameBin = + case QueueNameBin of + <<>> -> rabbit_misc:binstring_guid("amq.gen"); + Other -> check_name('queue', Other) + end, + Finish(rabbit_amqqueue:declare(RealmName, + ActualNameBin, + Durable, + AutoDelete, + Args)); + Other -> Other + end, + return_queue_declare_ok(State, NoWait, Q); + +handle_method(#'queue.declare'{ticket = TicketNumber, + queue = QueueNameBin, + passive = true, + nowait = NoWait}, + _, State = #ch{ virtual_host = VHostPath }) -> + #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State), + QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), + return_queue_declare_ok(State, NoWait, Q); + +handle_method(#'queue.delete'{ticket = TicketNumber, + queue = QueueNameBin, + if_unused = IfUnused, + if_empty = IfEmpty, + nowait = NoWait + }, + _, State) -> + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_ticket(TicketNumber, #ticket.active_flag, QueueName, State), + case rabbit_amqqueue:with_or_die( + QueueName, + fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of + {error, in_use} -> + die_precondition_failed( + "~s in use", [rabbit_misc:rs(QueueName)]); + {error, not_empty} -> + die_precondition_failed( + "~s not empty", [rabbit_misc:rs(QueueName)]); + {ok, PurgedMessageCount} -> + return_ok(State, NoWait, + #'queue.delete_ok'{ + message_count = PurgedMessageCount}) + end; + +handle_method(#'queue.bind'{ticket = TicketNumber, + queue = QueueNameBin, + exchange = ExchangeNameBin, + routing_key = RoutingKey, + nowait = NoWait, + arguments = Arguments}, + _, State = #ch{ virtual_host = VHostPath }) -> + %% FIXME: connection exception (!) on failure?? (see rule named "failure" in spec-XML) + %% FIXME: don't allow binding to internal exchanges - including the one named "" ! + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, + State), + check_ticket(TicketNumber, #ticket.active_flag, QueueName, State), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + case rabbit_amqqueue:add_binding(QueueName, ExchangeName, + ActualRoutingKey, Arguments) of + {error, queue_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(QueueName)]); + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + {error, durability_settings_incompatible} -> + rabbit_misc:protocol_error( + not_allowed, "durability settings of ~s incompatible with ~s", + [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); + {ok, _BindingCount} -> + return_ok(State, NoWait, #'queue.bind_ok'{}) + end; + +handle_method(#'queue.purge'{ticket = TicketNumber, + queue = QueueNameBin, + nowait = NoWait}, + _, State) -> + QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), + {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( + QueueName, + fun (Q) -> rabbit_amqqueue:purge(Q) end), + return_ok(State, NoWait, + #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> + {reply, #'tx.select_ok'{}, new_tx(State)}; + +handle_method(#'tx.select'{}, _, State) -> + {reply, #'tx.select_ok'{}, State}; + +handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> + rabbit_misc:protocol_error( + not_allowed, "channel is not transactional", []); + +handle_method(#'tx.commit'{}, _, State) -> + {reply, #'tx.commit_ok'{}, internal_commit(State)}; + +handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> + rabbit_misc:protocol_error( + not_allowed, "channel is not transactional", []); + +handle_method(#'tx.rollback'{}, _, State) -> + {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; + +handle_method(#'channel.flow'{active = _}, _, State) -> + %% FIXME: implement + {reply, #'channel.flow_ok'{active = true}, State}; + +handle_method(_MethodRecord, _Content, _State) -> + rabbit_misc:protocol_error( + command_invalid, "unimplemented method", []). + +%%---------------------------------------------------------------------------- + +publish(Mandatory, Immediate, Message, QPids, + State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> + Handled = deliver(QPids, Mandatory, Immediate, TxnKey, + Message, WriterPid), + case TxnKey of + none -> State; + _ -> add_tx_participants(Handled, State) + end. + +deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> + case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of + {ok, DeliveredQPids} -> DeliveredQPids; + {error, unroutable} -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), + []; + {error, not_delivered} -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), + [] + end. + +basic_return(#basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}, + WriterPid, ReplyCode, ReplyText) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.return'{reply_code = ReplyCode, + reply_text = ReplyText, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content). + +collect_acks(Q, 0, true) -> + {Q, queue:new()}; +collect_acks(Q, DeliveryTag, Multiple) -> + collect_acks(queue:new(), queue:new(), Q, DeliveryTag, Multiple). + +collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> + case queue:out(Q) of + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + QTail} -> + if CurrentDeliveryTag == DeliveryTag -> + {queue:in(UnackedMsg, ToAcc), queue:join(PrefixAcc, QTail)}; + Multiple -> + collect_acks(queue:in(UnackedMsg, ToAcc), PrefixAcc, + QTail, DeliveryTag, Multiple); + true -> + collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc), + QTail, DeliveryTag, Multiple) + end; + {empty, _} -> + {ToAcc, PrefixAcc} + end. + +add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> + State#ch{tx_participants = sets:union(Participants, + sets:from_list(MoreP))}. + +ack(ProxyPid, TxnKey, UAQ) -> + fold_per_queue( + fun (QPid, MsgIds, L) -> + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + [QPid | L] + end, [], UAQ). + +make_tx_id() -> rabbit_misc:guid(). + +safe_pmap_set_ok(F, S) -> + case lists:filter(fun (R) -> R =/= ok end, + rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, sets:to_list(S))) of + [] -> ok; + Errors -> {error, Errors} + end. + +notify_participants(F, TxnKey, Participants) -> + safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). + +new_tx(State) -> + State#ch{transaction_id = make_tx_id(), + tx_participants = sets:new(), + uncommitted_ack_q = queue:new()}. + +internal_commit(State = #ch{transaction_id = TxnKey, + tx_participants = Participants}) -> + case notify_participants(fun rabbit_amqqueue:commit/2, + TxnKey, Participants) of + ok -> new_tx(State); + {error, Errors} -> exit({commit_failed, Errors}) + end. + +internal_rollback(State = #ch{transaction_id = TxnKey, + tx_participants = Participants, + uncommitted_ack_q = UAQ, + unacked_message_q = UAMQ}) -> + ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n", + [self(), + queue:len(UAQ), + queue:len(UAMQ)]), + case notify_participants(fun rabbit_amqqueue:rollback/2, + TxnKey, Participants) of + ok -> NewUAMQ = queue:join(UAQ, UAMQ), + new_tx(State#ch{unacked_message_q = NewUAMQ}); + {error, Errors} -> exit({rollback_failed, Errors}) + end. + +fold_per_queue(F, Acc0, UAQ) -> + D = lists:foldl( + fun ({_DTag, _CTag, + {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> + %% dict:append would be simpler and avoid the + %% lists:reverse in handle_message({recover, true}, + %% ...). However, it is significantly slower when + %% going beyond a few thousand elements. + dict:update(QPid, + fun (MsgIds) -> [MsgId | MsgIds] end, + [MsgId], + D) + end, dict:new(), queue:to_list(UAQ)), + dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, + Acc0, D). + +notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> + safe_pmap_set_ok( + fun (QueueName) -> + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> + rabbit_amqqueue:notify_down(Q, ProxyPid) + end) of + ok -> + ok; + {error, not_found} -> + %% queue has been deleted in the meantime + ok + end + end, + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)). + +is_message_persistent(#content{properties = #'P_basic'{ + delivery_mode = Mode}}) -> + case Mode of + 1 -> false; + 2 -> true; + undefined -> false; + Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n", + [Other]), + false + end. + +lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> + State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; +lock_message(false, _MsgStruct, State) -> + State. + +internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, + {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}) -> + M = #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + ok = case Notify of + true -> rabbit_writer:send_command_and_notify( + WriterPid, QPid, ChPid, M, Content); + false -> rabbit_writer:send_command(WriterPid, M, Content) + end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl new file mode 100644 index 0000000000..ad796b61ae --- /dev/null +++ b/src/rabbit_control.erl @@ -0,0 +1,260 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_control). +-include("rabbit.hrl"). + +-export([start/0, stop/0, action/3]). + +-define(RPC_TIMEOUT, 30000). + +start() -> + case init:get_plain_arguments() of + [] -> + usage(); + FullCommand -> + {Node, Command, Args} = parse_args(FullCommand), + case catch action(Command, Node, Args) of + ok -> + io:format("done.~n"), + init:stop(); + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> + io:format("Invalid command ~p~n", [FullCommand]), + usage(); + Other -> + io:format("~nrabbit_control action ~p failed:~n~p~n", [Command, Other]), + halt(2) + end + end. + +parse_args(["-n", NodeS, Command | Args]) -> + Node = case lists:member($@, NodeS) of + true -> list_to_atom(NodeS); + false -> rabbit_misc:localnode(list_to_atom(NodeS)) + end, + {Node, list_to_atom(Command), Args}; +parse_args([Command | Args]) -> + {rabbit_misc:localnode(rabbit), list_to_atom(Command), Args}. + +stop() -> + ok. + +usage() -> + io:format("Usage: rabbitmqctl [-n <node>] <command> [<arg> ...] + +Available commands: + + stop - stops the RabbitMQ application and halts the node + stop_app - stops the RabbitMQ application, leaving the node running + start_app - starts the RabbitMQ application on an already-running node + reset - resets node to default configuration, deleting all data + force_reset + cluster <ClusterNode> ... + status + + add_user <UserName> <Password> + delete_user <UserName> + change_password <UserName> <NewPassword> + list_users + + add_vhost <VHostPath> + delete_vhost <VHostPath> + list_vhosts + + map_user_vhost <UserName> <VHostPath> + unmap_user_vhost <UserName> <VHostPath> + list_user_vhosts <UserName> + list_vhost_users <VHostPath> + + add_realm <VHostPath> <RealmName> + delete_realm <VHostPath> <RealmName> + list_realms <VHostPath> + + set_permissions <UserName> <VHostPath> <RealmName> [<Permission> ...] + Permissions management. The available permissions are 'passive', + 'active', 'write' and 'read', corresponding to the permissions + referred to in AMQP's \"access.request\" message, or 'all' as an + abbreviation for all defined permission flags. + list_permissions <UserName> <VHostPath> + +<node> should be the name of the master node of the RabbitMQ cluster. It +defaults to the node named \"rabbit\" on the local host. On a host named +\"server.example.com\", the master node will usually be rabbit@server (unless +NODENAME has been set to some non-default value at broker startup time). The +output of hostname -s is usually the correct suffix to use after the \"@\" sign. + +"), + halt(1). + +action(stop, Node, []) -> + io:format("Stopping and halting node ~p ...", [Node]), + call(Node, {rabbit, stop_and_halt, []}); + +action(stop_app, Node, []) -> + io:format("Stopping node ~p ...", [Node]), + call(Node, {rabbit, stop, []}); + +action(start_app, Node, []) -> + io:format("Starting node ~p ...", [Node]), + call(Node, {rabbit, start, []}); + +action(reset, Node, []) -> + io:format("Resetting node ~p ...", [Node]), + call(Node, {rabbit_mnesia, reset, []}); + +action(force_reset, Node, []) -> + io:format("Forcefully resetting node ~p ...", [Node]), + call(Node, {rabbit_mnesia, force_reset, []}); + +action(cluster, Node, ClusterNodeSs) -> + ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), + io:format("Clustering node ~p with ~p ...", + [Node, ClusterNodes]), + rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); + +action(status, Node, []) -> + io:format("Status of node ~p ...", [Node]), + Res = call(Node, {rabbit, status, []}), + io:format("~n~p~n", [Res]), + ok; + +action(add_user, Node, Args = [Username, _Password]) -> + io:format("Creating user ~p ...", [Username]), + call(Node, {rabbit_access_control, add_user, Args}); + +action(delete_user, Node, Args = [_Username]) -> + io:format("Deleting user ~p ...", Args), + call(Node, {rabbit_access_control, delete_user, Args}); + +action(change_password, Node, Args = [Username, _Newpassword]) -> + io:format("Changing password for user ~p ...", [Username]), + call(Node, {rabbit_access_control, change_password, Args}); + +action(list_users, Node, []) -> + io:format("Listing users ..."), + display_list(call(Node, {rabbit_access_control, list_users, []})); + +action(add_vhost, Node, Args = [_VHostPath]) -> + io:format("Creating vhost ~p ...", Args), + call(Node, {rabbit_access_control, add_vhost, Args}); + +action(delete_vhost, Node, Args = [_VHostPath]) -> + io:format("Deleting vhost ~p ...", Args), + call(Node, {rabbit_access_control, delete_vhost, Args}); + +action(list_vhosts, Node, []) -> + io:format("Listing vhosts ..."), + display_list(call(Node, {rabbit_access_control, list_vhosts, []})); + +action(map_user_vhost, Node, Args = [_Username, _VHostPath]) -> + io:format("Mapping user ~p to vhost ~p ...", Args), + call(Node, {rabbit_access_control, map_user_vhost, Args}); + +action(unmap_user_vhost, Node, Args = [_Username, _VHostPath]) -> + io:format("Unmapping user ~p from vhost ~p ...", Args), + call(Node, {rabbit_access_control, unmap_user_vhost, Args}); + +action(list_user_vhosts, Node, Args = [_Username]) -> + io:format("Listing vhosts for user ~p...", Args), + display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args})); + +action(list_vhost_users, Node, Args = [_VHostPath]) -> + io:format("Listing users for vhosts ~p...", Args), + display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); + +action(add_realm, Node, [VHostPath, RealmName]) -> + io:format("Adding realm ~p to vhost ~p ...", [RealmName, VHostPath]), + rpc_call(Node, rabbit_realm, add_realm, + [realm_rsrc(VHostPath, RealmName)]); + +action(delete_realm, Node, [VHostPath, RealmName]) -> + io:format("Deleting realm ~p from vhost ~p ...", [RealmName, VHostPath]), + rpc_call(Node, rabbit_realm, delete_realm, + [realm_rsrc(VHostPath, RealmName)]); + +action(list_realms, Node, Args = [_VHostPath]) -> + io:format("Listing realms for vhost ~p ...", Args), + display_list(call(Node, {rabbit_realm, list_vhost_realms, Args})); + +action(set_permissions, Node, + [Username, VHostPath, RealmName | Permissions]) -> + io:format("Setting permissions for user ~p, vhost ~p, realm ~p ...", + [Username, VHostPath, RealmName]), + CheckedPermissions = check_permissions(Permissions), + Ticket = #ticket{ + realm_name = realm_rsrc(VHostPath, RealmName), + passive_flag = lists:member(passive, CheckedPermissions), + active_flag = lists:member(active, CheckedPermissions), + write_flag = lists:member(write, CheckedPermissions), + read_flag = lists:member(read, CheckedPermissions)}, + rpc_call(Node, rabbit_access_control, map_user_realm, + [list_to_binary(Username), Ticket]); + +action(list_permissions, Node, Args = [_Username, _VHostPath]) -> + io:format("Listing permissions for user ~p in vhost ~p ...", Args), + Perms = call(Node, {rabbit_access_control, list_user_realms, Args}), + if is_list(Perms) -> + lists:foreach( + fun ({RealmName, Pattern}) -> + io:format("~n~s: ~p", + [binary_to_list(RealmName), + rabbit_misc:permission_list(Pattern)]) + end, + lists:sort(Perms)), + io:nl(), + ok; + true -> Perms + end. + +check_permissions([]) -> []; +check_permissions(["all" | R]) -> + [passive, active, write, read | check_permissions(R)]; +check_permissions([P | R]) when (P == "passive") or + (P == "active") or + (P == "write") or + (P == "read") -> + [list_to_atom(P) | check_permissions(R)]; +check_permissions([P | _R]) -> + io:format("~nError: invalid permission flag ~p~n", [P]), + usage(). + +realm_rsrc(VHostPath, RealmName) -> + rabbit_misc:r(list_to_binary(VHostPath), + realm, + list_to_binary(RealmName)). + +display_list(L) when is_list(L) -> + lists:foreach(fun (I) -> + io:format("~n~s", [binary_to_list(I)]) + end, + lists:sort(L)), + io:nl(); +display_list(Other) -> Other. + +call(Node, {Mod, Fun, Args}) -> + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)). + +rpc_call(Node, Mod, Fun, Args) -> + rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl new file mode 100644 index 0000000000..0ae116bb2a --- /dev/null +++ b/src/rabbit_error_logger.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_error_logger). +-include("rabbit.hrl"). + +-define(LOG_EXCH_NAME, <<"amq.rabbitmq.log">>). + +-behaviour(gen_event). + +-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]). + +init([DefaultVHost]) -> + #exchange{} = rabbit_exchange:declare( + #resource{virtual_host = DefaultVHost, + kind = realm, + name = <<"/admin">>}, + ?LOG_EXCH_NAME, + topic, true, false, []), + {ok, #resource{virtual_host = DefaultVHost, + kind = exchange, + name = ?LOG_EXCH_NAME}}. + +terminate(_Arg, _State) -> + terminated_ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, not_understood, State}. + +handle_event({Kind, _Gleader, {_Pid, Format, Data}}, State) -> + ok = publish(Kind, Format, Data, State), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +publish(error, Format, Data, State) -> + publish1(<<"error">>, Format, Data, State); +publish(warning_msg, Format, Data, State) -> + publish1(<<"warning">>, Format, Data, State); +publish(info_msg, Format, Data, State) -> + publish1(<<"info">>, Format, Data, State); +publish(_Other, _Format, _Data, _State) -> + ok. + +publish1(RoutingKey, Format, Data, LogExch) -> + {ok, _QueueNames} = rabbit_exchange:simple_publish( + false, false, LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))), + ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl new file mode 100644 index 0000000000..113b7878fa --- /dev/null +++ b/src/rabbit_exchange.erl @@ -0,0 +1,380 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_exchange). +-include_lib("stdlib/include/qlc.hrl"). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([recover/0, declare/6, lookup/1, lookup_or_die/1, + list_vhost_exchanges/1, list_exchange_bindings/1, + simple_publish/6, simple_publish/3, + route/2]). +-export([add_binding/2, delete_binding/2]). +-export([delete/2]). +-export([check_type/1, assert_type/2, topic_matches/2]). + +-import(mnesia). +-import(sets). +-import(lists). +-import(qlc). +-import(regexp). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(publish_res() :: {'ok', [pid()]} | + not_found() | {'error', 'unroutable' | 'not_delivered'}). + +-spec(recover/0 :: () -> 'ok'). +-spec(declare/6 :: (realm_name(), name(), exchange_type(), bool(), bool(), + amqp_table()) -> exchange()). +-spec(check_type/1 :: (binary()) -> atom()). +-spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). +-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). +-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). +-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). +-spec(list_exchange_bindings/1 :: (exchange_name()) -> + [{queue_name(), routing_key(), amqp_table()}]). +-spec(simple_publish/6 :: + (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> + publish_res()). +-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). +-spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(add_binding/2 :: (binding_spec(), amqqueue()) -> + 'ok' | not_found() | + {'error', 'durability_settings_incompatible'}). +-spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> + 'ok' | not_found()). +-spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(delete/2 :: (exchange_name(), bool()) -> + 'ok' | not_found() | {'error', 'in_use'}). + +-endif. + +%%---------------------------------------------------------------------------- + +recover() -> + ok = recover_durable_exchanges(), + ok. + +recover_durable_exchanges() -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl(fun (Exchange, Acc) -> + ok = mnesia:write(Exchange), + Acc + end, ok, durable_exchanges) + end). + +declare(RealmName, NameBin, Type, Durable, AutoDelete, Args) -> + XName = rabbit_misc:r(RealmName, exchange, NameBin), + Exchange = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:wread({exchange, XName}) of + [] -> ok = mnesia:write(Exchange), + if Durable -> + ok = mnesia:write( + durable_exchanges, Exchange, write); + true -> ok + end, + ok = rabbit_realm:add(RealmName, XName), + Exchange; + [ExistingX] -> ExistingX + end + end). + +check_type(<<"fanout">>) -> + fanout; +check_type(<<"direct">>) -> + direct; +check_type(<<"topic">>) -> + topic; +check_type(T) -> + rabbit_misc:protocol_error( + command_invalid, "invalid exchange type '~s'", [T]). + +assert_type(#exchange{ type = ActualType }, RequiredType) + when ActualType == RequiredType -> + ok; +assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> + rabbit_misc:protocol_error( + not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", + [rabbit_misc:rs(Name), ActualType, RequiredType]). + +lookup(Name) -> + rabbit_misc:dirty_read({exchange, Name}). + +lookup_or_die(Name) -> + case lookup(Name) of + {ok, X} -> X; + {error, not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(Name)]) + end. + +list_vhost_exchanges(VHostPath) -> + mnesia:dirty_match_object( + #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). + +list_exchange_bindings(Name) -> + [{QueueName, RoutingKey, Arguments} || + #binding{handlers = Handlers} <- bindings_for_exchange(Name), + #handler{binding_spec = #binding_spec{routing_key = RoutingKey, + arguments = Arguments}, + queue = QueueName} <- Handlers]. + +bindings_for_exchange(Name) -> + qlc:e(qlc:q([B || + B = #binding{key = K} <- mnesia:table(binding), + element(1, K) == Name])). + +empty_handlers() -> + []. + +%% Usable by Erlang code that wants to publish messages. +simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + persistent_key = none}, + simple_publish(Mandatory, Immediate, Message). + +%% Usable by Erlang code that wants to publish messages. +simple_publish(Mandatory, Immediate, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey}) -> + case lookup(ExchangeName) of + {ok, Exchange} -> + QPids = route(Exchange, RoutingKey), + rabbit_router:deliver(QPids, Mandatory, Immediate, + none, Message); + {error, Error} -> {error, Error} + end. + +%% return the list of qpids to which a message with a given routing +%% key, sent to a particular exchange, should be delivered. +%% +%% The function ensures that a qpid appears in the return list exactly +%% as many times as a message should be delivered to it. With the +%% current exchange types that is at most once. +route(#exchange{name = Name, type = topic}, RoutingKey) -> + sets:to_list( + sets:union( + mnesia:activity( + async_dirty, + fun () -> + qlc:e(qlc:q([handler_qpids(H) || + #binding{key = {Name1, PatternKey}, + handlers = H} + <- mnesia:table(binding), + Name == Name1, + topic_matches(PatternKey, RoutingKey)])) + end))); + +route(#exchange{name = Name, type = Type}, RoutingKey) -> + BindingKey = delivery_key_for_type(Type, Name, RoutingKey), + case rabbit_misc:dirty_read({binding, BindingKey}) of + {ok, #binding{handlers = H}} -> sets:to_list(handler_qpids(H)); + {error, not_found} -> [] + end. + +delivery_key_for_type(fanout, Name, _RoutingKey) -> + {Name, fanout}; +delivery_key_for_type(_Type, Name, RoutingKey) -> + {Name, RoutingKey}. + +call_with_exchange(Name, Fun) -> + case mnesia:wread({exchange, Name}) of + [] -> {error, not_found}; + [X] -> Fun(X) + end. + +make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> + #handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. + +add_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, + routing_key = RoutingKey}, Q) -> + call_with_exchange( + ExchangeName, + fun (X) -> if Q#amqqueue.durable and not(X#exchange.durable) -> + {error, durability_settings_incompatible}; + true -> + internal_add_binding( + X, RoutingKey, make_handler(BindingSpec, Q)) + end + end). + +delete_binding(BindingSpec = #binding_spec{exchange_name = ExchangeName, + routing_key = RoutingKey}, Q) -> + call_with_exchange( + ExchangeName, + fun (X) -> ok = internal_delete_binding( + X, RoutingKey, make_handler(BindingSpec, Q)), + maybe_auto_delete(X) + end). + +%% Must run within a transaction. +maybe_auto_delete(#exchange{auto_delete = false}) -> + ok; +maybe_auto_delete(#exchange{name = ExchangeName, auto_delete = true}) -> + case internal_delete(ExchangeName, true) of + {error, in_use} -> ok; + ok -> ok + end. + +handlers_isempty([]) -> true; +handlers_isempty([_|_]) -> false. + +extend_handlers(Handlers, Handler) -> [Handler | Handlers]. + +delete_handler(Handlers, Handler) -> lists:delete(Handler, Handlers). + +handler_qpids(Handlers) -> + sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). + +%% Must run within a transaction. +internal_add_binding(#exchange{name = ExchangeName, type = Type}, + RoutingKey, Handler) -> + BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), + ok = add_handler_to_binding(BindingKey, Handler). + +%% Must run within a transaction. +internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> + BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), + remove_handler_from_binding(BindingKey, Handler), + ok. + +%% Must run within a transaction. +add_handler_to_binding(BindingKey, Handler) -> + ok = case mnesia:wread({binding, BindingKey}) of + [] -> + ok = mnesia:write( + #binding{key = BindingKey, + handlers = extend_handlers( + empty_handlers(), Handler)}); + [B = #binding{handlers = H}] -> + ok = mnesia:write( + B#binding{handlers = extend_handlers(H, Handler)}) + end. + +%% Must run within a transaction. +remove_handler_from_binding(BindingKey, Handler) -> + case mnesia:wread({binding, BindingKey}) of + [] -> empty; + [B = #binding{handlers = H}] -> + H1 = delete_handler(H, Handler), + case handlers_isempty(H1) of + true -> + ok = mnesia:delete({binding, BindingKey}), + empty; + _ -> + ok = mnesia:write(B#binding{handlers = H1}), + not_empty + end + end. + +split_topic_key(Key) -> + {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), + KeySplit. + +topic_matches(PatternKey, RoutingKey) -> + P = split_topic_key(PatternKey), + R = split_topic_key(RoutingKey), + topic_matches1(P, R). + +topic_matches1(["#"], _R) -> + true; +topic_matches1(["#" | PTail], R) -> + last_topic_match(PTail, [], lists:reverse(R)); +topic_matches1([], []) -> + true; +topic_matches1(["*" | PatRest], [_ | ValRest]) -> + topic_matches1(PatRest, ValRest); +topic_matches1([PatElement | PatRest], [ValElement | ValRest]) when PatElement == ValElement -> + topic_matches1(PatRest, ValRest); +topic_matches1(_, _) -> + false. + +last_topic_match(P, R, []) -> + topic_matches1(P, R); +last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> + topic_matches1(P, R) or last_topic_match(P, [BacktrackNext | R], BacktrackList). + +delete(ExchangeName, IfUnused) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> internal_delete(ExchangeName, IfUnused) end). + +internal_delete(ExchangeName, _IfUnused = true) -> + Bindings = bindings_for_exchange(ExchangeName), + case Bindings of + [] -> do_internal_delete(ExchangeName, Bindings); + _ -> + case lists:all(fun (#binding{handlers = H}) -> handlers_isempty(H) end, + Bindings) of + true -> + %% There are no handlers anywhere in any of the + %% bindings for this exchange. + do_internal_delete(ExchangeName, Bindings); + false -> + %% There was at least one real handler + %% present. It's still in use. + {error, in_use} + end + end; +internal_delete(ExchangeName, false) -> + do_internal_delete(ExchangeName, bindings_for_exchange(ExchangeName)). + +forcibly_remove_handlers(Handlers) -> + lists:foreach( + fun (#handler{binding_spec = BindingSpec, queue = QueueName}) -> + ok = rabbit_amqqueue:binding_forcibly_removed( + BindingSpec, QueueName) + end, Handlers), + ok. + +do_internal_delete(ExchangeName, Bindings) -> + case mnesia:wread({exchange, ExchangeName}) of + [] -> {error, not_found}; + _ -> + lists:foreach(fun (#binding{key = K, handlers = H}) -> + ok = forcibly_remove_handlers(H), + ok = mnesia:delete({binding, K}) + end, Bindings), + ok = mnesia:delete({durable_exchanges, ExchangeName}), + ok = mnesia:delete({exchange, ExchangeName}), + ok = rabbit_realm:delete_from_all(ExchangeName) + end. diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl new file mode 100644 index 0000000000..76c0461399 --- /dev/null +++ b/src/rabbit_framing_channel.erl @@ -0,0 +1,115 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_framing_channel). +-include("rabbit.hrl"). + +-export([start_link/2, process/2, shutdown/1]). + +%% internal +-export([mainloop/1]). + +%%-------------------------------------------------------------------- + +start_link(StartFun, StartArgs) -> + spawn_link( + fun () -> + %% we trap exits so that a normal termination of the + %% channel or reader process terminates us too. + process_flag(trap_exit, true), + mainloop(apply(StartFun, StartArgs)) + end). + +process(Pid, Frame) -> + Pid ! {frame, Frame}, + ok. + +shutdown(Pid) -> + Pid ! terminate, + ok. + +%%-------------------------------------------------------------------- + +read_frame(ChannelPid) -> + receive + %% converting the exit signal into one of our own ensures that + %% the reader sees the right pid (i.e. ours) when a channel + %% exits. Similarly in the other direction, though it is not + %% really relevant there since the channel is not specifically + %% watching out for reader exit signals. + {'EXIT', _Pid, Reason} -> exit(Reason); + {frame, Frame} -> Frame; + terminate -> rabbit_channel:shutdown(ChannelPid), + read_frame(ChannelPid); + Msg -> exit({unexpected_message, Msg}) + end. + +mainloop(ChannelPid) -> + {method, MethodName, FieldsBin} = read_frame(ChannelPid), + Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), + case rabbit_framing:method_has_content(MethodName) of + true -> rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, MethodName)); + false -> rabbit_channel:do(ChannelPid, Method) + end, + ?MODULE:mainloop(ChannelPid). + +collect_content(ChannelPid, MethodName) -> + {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), + case read_frame(ChannelPid) of + {content_header, HeaderClassId, 0, BodySize, PropertiesBin} -> + if HeaderClassId == ClassId -> + Payload = collect_content_payload(ChannelPid, BodySize, []), + #content{class_id = ClassId, + properties = none, + properties_bin = PropertiesBin, + payload_fragments_rev = Payload}; + true -> + rabbit_misc:protocol_error( + command_invalid, + "expected content header for class ~w, got one for class ~w instead", + [ClassId, HeaderClassId]) + end; + _ -> + rabbit_misc:protocol_error( + command_invalid, + "expected content header for class ~w, got non content header frame instead", + [ClassId]) + end. + +collect_content_payload(_ChannelPid, 0, Acc) -> + Acc; +collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> + case read_frame(ChannelPid) of + {content_body, FragmentBin} -> + collect_content_payload(ChannelPid, + RemainingByteCount - size(FragmentBin), + [FragmentBin | Acc]); + _ -> + rabbit_misc:protocol_error( + command_invalid, + "expected content body, got non content body frame instead", + []) + end. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl new file mode 100644 index 0000000000..6e825578d3 --- /dev/null +++ b/src/rabbit_heartbeat.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_heartbeat). + +-export([start_heartbeat/2]). + +start_heartbeat(_Sock, 0) -> + none; +start_heartbeat(Sock, TimeoutSec) -> + Parent = self(), + %% we check for incoming data every interval, and time out after + %% two checks with no change. As a result we will time out between + %% 2 and 3 intervals after the last data has been received. + spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000, + recv_oct, 1, + fun () -> + Parent ! timeout, + stop + end, + erlang:monitor(process, Parent)) end), + %% the 'div 2' is there so that we don't end up waiting for nearly + %% 2 * TimeoutSec before sending a heartbeat in the boundary case + %% where the last message was sent just after a heartbeat. + spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, + send_oct, 0, + fun () -> + catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + continue + end, + erlang:monitor(process, Parent)) end), + ok. + +%% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list +%% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html +y(X) -> + F = fun (P) -> X(fun (A) -> (P(P))(A) end) end, + F(F). + +heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> + Heartbeat = + fun (F) -> + fun ({StatVal, SameCount}) -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> ok; + Other -> exit({unexpected_message, Other}) + after TimeoutMillisec -> + case prim_inet:getstat(Sock, [StatName]) of + {ok, [{StatName, NewStatVal}]} -> + if NewStatVal =/= StatVal -> + F({NewStatVal, 0}); + SameCount < Threshold -> + F({NewStatVal, SameCount + 1}); + true -> + case Handler() of + stop -> ok; + continue -> F({NewStatVal, 0}) + end + end; + {error, einval} -> + %% the socket is dead, most + %% likely because the + %% connection is being shut + %% down -> terminate + ok; + {error, Reason} -> + exit({cannot_get_socket_stats, Reason}) + end + end + end + end, + (y(Heartbeat))({0, 0}). diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl new file mode 100644 index 0000000000..8deec8ebee --- /dev/null +++ b/src/rabbit_load.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_load). + +-export([local_load/0, remote_loads/0, pick/0]). + +-define(FUDGE_FACTOR, 0.98). +-define(TIMEOUT, 100). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(node() :: atom()). +-type(load() :: {{non_neg_integer(), float()}, node()}). +-spec(local_load/0 :: () -> load()). +-spec(remote_loads/0 :: () -> [load()]). +-spec(pick/0 :: () -> node()). + +-endif. + +%%---------------------------------------------------------------------------- + +local_load() -> + LoadAvg = case whereis(cpu_sup) of + undefined -> 0.0; + _Other -> cpu_sup:avg1() + end, + {{statistics(run_queue), LoadAvg}, node()}. + +remote_loads() -> + {ResL, _BadNodes} = + rpc:multicall(nodes(), ?MODULE, local_load, [], ?TIMEOUT), + ResL. + +pick() -> + RemoteLoads = remote_loads(), + {{RunQ, LoadAvg}, Node} = local_load(), + %% add bias towards current node + AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], + {_, SelectedNode} = lists:min(Loads), + SelectedNode. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl new file mode 100644 index 0000000000..a8f839f0a6 --- /dev/null +++ b/src/rabbit_log.erl @@ -0,0 +1,132 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_log). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([debug/1, debug/2, info/1, info/2, + warning/1, warning/2, error/1, error/2]). + +-import(io). +-import(error_logger). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(debug/1 :: (string()) -> 'ok'). +-spec(debug/2 :: (string(), [any()]) -> 'ok'). +-spec(info/1 :: (string()) -> 'ok'). +-spec(info/2 :: (string(), [any()]) -> 'ok'). +-spec(warning/1 :: (string()) -> 'ok'). +-spec(warning/2 :: (string(), [any()]) -> 'ok'). +-spec(error/1 :: (string()) -> 'ok'). +-spec(error/2 :: (string(), [any()]) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +debug(Fmt) -> + gen_server:cast(?SERVER, {debug, Fmt}). + +debug(Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {debug, Fmt, Args}). + +info(Fmt) -> + gen_server:cast(?SERVER, {info, Fmt}). + +info(Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {info, Fmt, Args}). + +warning(Fmt) -> + gen_server:cast(?SERVER, {warning, Fmt}). + +warning(Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {warning, Fmt, Args}). + +error(Fmt) -> + gen_server:cast(?SERVER, {error, Fmt}). + +error(Fmt, Args) when is_list(Args) -> + gen_server:cast(?SERVER, {error, Fmt, Args}). + +%%-------------------------------------------------------------------- + +init([]) -> {ok, none}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast({debug, Fmt}, State) -> + io:format("debug:: "), io:format(Fmt), + error_logger:info_msg("debug:: " ++ Fmt), + {noreply, State}; +handle_cast({debug, Fmt, Args}, State) -> + io:format("debug:: "), io:format(Fmt, Args), + error_logger:info_msg("debug:: " ++ Fmt, Args), + {noreply, State}; +handle_cast({info, Fmt}, State) -> + error_logger:info_msg(Fmt), + {noreply, State}; +handle_cast({info, Fmt, Args}, State) -> + error_logger:info_msg(Fmt, Args), + {noreply, State}; +handle_cast({warning, Fmt}, State) -> + error_logger:warning_msg(Fmt), + {noreply, State}; +handle_cast({warning, Fmt, Args}, State) -> + error_logger:warning_msg(Fmt, Args), + {noreply, State}; +handle_cast({error, Fmt}, State) -> + error_logger:error_msg(Fmt), + {noreply, State}; +handle_cast({error, Fmt, Args}, State) -> + error_logger:error_msg(Fmt, Args), + {noreply, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl new file mode 100644 index 0000000000..927d7712d7 --- /dev/null +++ b/src/rabbit_misc.erl @@ -0,0 +1,391 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_misc). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([method_record_type/1, polite_pause/0, polite_pause/1]). +-export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([strict_ticket_checking/0]). +-export([get_config/1, get_config/2, set_config/2]). +-export([dirty_read/1]). +-export([r/3, r/2, rs/1]). +-export([permission_list/1]). +-export([enable_cover/0, report_cover/0]). +-export([with_exit_handler/2]). +-export([with_user/2, with_vhost/2, with_realm/2, with_user_and_vhost/3]). +-export([execute_mnesia_transaction/1]). +-export([ensure_ok/2]). +-export([localnode/1, tcp_name/3]). +-export([intersperse/2, upmap/2, map_in_order/2]). +-export([guid/0, string_guid/1, binstring_guid/1]). +-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). + +-import(mnesia). +-import(lists). +-import(cover). +-import(disk_log). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-include_lib("kernel/include/inet.hrl"). + +-spec(method_record_type/1 :: (tuple()) -> atom()). +-spec(polite_pause/0 :: () -> 'done'). +-spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). +-spec(die/1 :: (atom()) -> no_return()). +-spec(frame_error/2 :: (atom(), binary()) -> no_return()). +-spec(protocol_error/3 :: + (atom() | amqp_error(), string(), [any()]) -> no_return()). +-spec(protocol_error/4 :: + (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(strict_ticket_checking/0 :: () -> bool()). +-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). +-spec(get_config/2 :: (atom(), A) -> A). +-spec(set_config/2 :: (atom(), any()) -> 'ok'). +-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). +-spec(r/3 :: (realm_name() | vhost(), K, name()) -> + r(K) when is_subtype(K, atom())). +-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), + kind :: K, + name :: '_'} + when is_subtype(K, atom())). +-spec(rs/1 :: (r(atom())) -> string()). +-spec(permission_list/1 :: (ticket()) -> [permission()]). +-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(report_cover/0 :: () -> 'ok'). +-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(with_user/2 :: (username(), thunk(A)) -> A). +-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). +-spec(with_realm/2 :: (realm_name(), thunk(A)) -> A). +-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). +-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). +-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(localnode/1 :: (atom()) -> node()). +-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). +-spec(intersperse/2 :: (A, [A]) -> [A]). +-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(guid/0 :: () -> guid()). +-spec(string_guid/1 :: (any()) -> string()). +-spec(binstring_guid/1 :: (any()) -> binary()). +-spec(dirty_read_all/1 :: (atom()) -> [any()]). +-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> + 'ok' | 'aborted'). +-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). + +-endif. + +%%---------------------------------------------------------------------------- + +method_record_type(Record) -> + element(1, Record). + +polite_pause() -> + polite_pause(3000). + +polite_pause(N) -> + receive + after N -> done + end. + +die(Error) -> + protocol_error(Error, "~w", [Error]). + +frame_error(MethodName, BinaryFields) -> + protocol_error(frame_error, "cannot decode ~w", + [BinaryFields], MethodName). + +protocol_error(Error, Explanation, Params) -> + protocol_error(Error, Explanation, Params, none). + +protocol_error(Error, Explanation, Params, Method) -> + CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), + exit({amqp, Error, CompleteExplanation, Method}). + +boolean_config_param(Name, TrueValue, FalseValue, DefaultValue) -> + ActualValue = get_config(Name, DefaultValue), + if + ActualValue == TrueValue -> + true; + ActualValue == FalseValue -> + false; + true -> + rabbit_log:error( + "Bad setting for config param '~w': ~p~n" ++ + "legal values are '~w', '~w'; using default value '~w'", + [Name, ActualValue, TrueValue, FalseValue, DefaultValue]), + DefaultValue == TrueValue + end. + +strict_ticket_checking() -> + boolean_config_param(strict_ticket_checking, enabled, disabled, disabled). + +get_config(Key) -> + case dirty_read({rabbit_config, Key}) of + {ok, {rabbit_config, Key, V}} -> {ok, V}; + Other -> Other + end. + +get_config(Key, DefaultValue) -> + case get_config(Key) of + {ok, V} -> V; + {error, not_found} -> DefaultValue + end. + +set_config(Key, Value) -> + ok = mnesia:dirty_write({rabbit_config, Key, Value}). + +dirty_read(ReadSpec) -> + case mnesia:dirty_read(ReadSpec) of + [Result] -> {ok, Result}; + [] -> {error, not_found} + end. + +r(#resource{virtual_host = VHostPath}, Kind, Name) + when is_binary(Name) -> + #resource{virtual_host = VHostPath, kind = Kind, name = Name}; +r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) -> + #resource{virtual_host = VHostPath, kind = Kind, name = Name}. + +r(VHostPath, Kind) when is_binary(VHostPath) -> + #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. + +rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> + lists:flatten(io_lib:format("~s '~s' in vhost '~s'", + [Kind, Name, VHostPath])). + +permission_list(Ticket = #ticket{}) -> + lists:foldr(fun ({Field, Label}, L) -> + case element(Field, Ticket) of + true -> [Label | L]; + false -> L + end + end, + [], + [{#ticket.passive_flag, passive}, + {#ticket.active_flag, active}, + {#ticket.write_flag, write}, + {#ticket.read_flag, read}]). + +enable_cover() -> + case cover:compile_beam_directory("ebin") of + {error,Reason} -> {error,Reason}; + _ -> ok + end. + +report_cover() -> + Dir = "cover/", + ok = filelib:ensure_dir(Dir), + lists:foreach(fun(F) -> file:delete(F) end, + filelib:wildcard(Dir ++ "*.html")), + {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]), + {CT, NCT} = + lists:foldl( + fun(M,{CovTot, NotCovTot}) -> + {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), + ok = report_coverage_percentage(SummaryFile, + Cov, NotCov, M), + {ok,_} = cover:analyze_to_file( + M, + Dir ++ atom_to_list(M) ++ ".html", + [html]), + {CovTot+Cov, NotCovTot+NotCov} + end, + {0, 0}, + lists:sort(cover:modules())), + ok = report_coverage_percentage(SummaryFile, CT, NCT, 'TOTAL'), + ok = file:close(SummaryFile), + ok. + +report_coverage_percentage(File, Cov, NotCov, Mod) -> + io:fwrite(File, "~6.2f ~p~n", + [if + Cov+NotCov > 0 -> 100.0*Cov/(Cov+NotCov); + true -> 100.0 + end, + Mod]). + +with_exit_handler(Handler, Thunk) -> + try + Thunk() + catch + exit:{R, _} when R =:= noproc; R =:= normal -> Handler() + end. + +with_user(Username, Thunk) -> + fun () -> + case mnesia:read({user, Username}) of + [] -> + mnesia:abort({no_such_user, Username}); + [_U] -> + Thunk() + end + end. + +with_vhost(VHostPath, Thunk) -> + fun () -> + case mnesia:read({vhost, VHostPath}) of + [] -> + mnesia:abort({no_such_vhost, VHostPath}); + [_V] -> + Thunk() + end + end. + +with_realm(Name = #resource{virtual_host = VHostPath, kind = realm}, + Thunk) -> + fun () -> + case mnesia:read({realm, Name}) of + [] -> + mnesia:abort({no_such_realm, Name}); + [_R] -> + case mnesia:match_object( + #vhost_realm{virtual_host = VHostPath, + realm = Name}) of + [] -> + %% This should never happen + mnesia:abort({no_such_realm, Name}); + [_VR] -> + Thunk() + end + end + end. + +with_user_and_vhost(Username, VHostPath, Thunk) -> + with_user(Username, with_vhost(VHostPath, Thunk)). + +execute_mnesia_transaction(TxFun) -> + %% Making this a sync_transaction allows us to use dirty_read + %% elsewhere and get a consistent result even when that read + %% executes on a different node. + case mnesia:sync_transaction(TxFun) of + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) + end. + +ensure_ok(ok, _) -> ok; +ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). + +localnode(Name) -> + %% This is horrible, but there doesn't seem to be a way to split a + %% nodename into its constituent parts. + list_to_atom(lists:append(atom_to_list(Name), + lists:dropwhile(fun (E) -> E =/= $@ end, + atom_to_list(node())))). + +tcp_name(Prefix, IPAddress, Port) + when is_atom(Prefix) andalso is_number(Port) -> + list_to_atom( + lists:flatten( + io_lib:format("~w_~s:~w", + [Prefix, inet_parse:ntoa(IPAddress), Port]))). + +intersperse(_, []) -> []; +intersperse(_, [E]) -> [E]; +intersperse(Sep, [E|T]) -> [E, Sep | intersperse(Sep, T)]. + +%% This is a modified version of Luke Gorrie's pmap - +%% http://lukego.livejournal.com/6753.html - that doesn't care about +%% the order in which results are received. +upmap(F, L) -> + Parent = self(), + Ref = make_ref(), + [receive {Ref, Result} -> Result end + || _ <- [spawn(fun() -> Parent ! {Ref, F(X)} end) || X <- L]]. + +map_in_order(F, L) -> + lists:reverse( + lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). + +%% generate a guid that is monotonically increasing per process. +%% +%% The id is only unique within a single cluster and as the persistent +%% message store hasn't been deleted. +guid() -> + %% We don't use erlang:now() here because a) it may return + %% duplicates when the system clock has been rewound prior to a + %% restart, or ids were generated at a high rate (which causes + %% now() to move ahead of the system time), and b) it is really + %% slow since it takes a global lock and makes a system call. + %% + %% rabbit_persister:serial/0, in combination with self/0 (which + %% includes the node name) uniquely identifies a process in space + %% and time. We combine that with a process-local counter to give + %% us a GUID that is monotonically increasing per process. + G = case get(guid) of + undefined -> {{rabbit_persister:serial(), self()}, 0}; + {S, I} -> {S, I+1} + end, + put(guid, G), + G. + +%% generate a readable string representation of a guid. Note that any +%% monotonicity of the guid is not preserved in the encoding. +string_guid(Prefix) -> + %% we use the (undocumented) ssl_base64 module here because it is + %% present throughout OTP R11 and R12 whereas base64 only becomes + %% available in R11B-4. + %% + %% TODO: once debian stable and EPEL have moved from R11B-2 to + %% R11B-4 or later we should change this to use base64. + Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). + +binstring_guid(Prefix) -> + list_to_binary(string_guid(Prefix)). + +dirty_read_all(TableName) -> + mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). + +dirty_foreach_key(F, TableName) -> + dirty_foreach_key1(F, TableName, mnesia:dirty_first(TableName)). + +dirty_foreach_key1(_F, _TableName, '$end_of_table') -> + ok; +dirty_foreach_key1(F, TableName, K) -> + case catch mnesia:dirty_next(TableName, K) of + {'EXIT', _} -> + aborted; + NextKey -> + F(K), + dirty_foreach_key1(F, TableName, NextKey) + end. + +dirty_dump_log(FileName) -> + {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + dirty_dump_log1(LH, disk_log:chunk(LH, start)), + disk_log:close(LH). + +dirty_dump_log1(_LH, eof) -> + io:format("Done.~n"); +dirty_dump_log1(LH, {K, Terms}) -> + io:format("Chunk: ~p~n", [Terms]), + dirty_dump_log1(LH, disk_log:chunk(LH, K)); +dirty_dump_log1(LH, {K, Terms, BadBytes}) -> + io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), + dirty_dump_log1(LH, disk_log:chunk(LH, K)). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl new file mode 100644 index 0000000000..82b80cb491 --- /dev/null +++ b/src/rabbit_mnesia.erl @@ -0,0 +1,399 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_mnesia). + +-export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0, + cluster/1, reset/0, force_reset/0]). + +-export([table_names/0]). + +%% create_tables/0 exported for helping embed RabbitMQ in or alongside +%% other mnesia-using Erlang applications, such as ejabberd +-export([create_tables/0]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [node()]}]). +-spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-spec(init/0 :: () -> 'ok'). +-spec(is_db_empty/0 :: () -> bool()). +-spec(cluster/1 :: ([node()]) -> 'ok'). +-spec(reset/0 :: () -> 'ok'). +-spec(force_reset/0 :: () -> 'ok'). +-spec(create_tables/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +status() -> + [{nodes, mnesia:system_info(db_nodes)}, + {running_nodes, mnesia:system_info(running_db_nodes)}]. + +init() -> + ok = ensure_mnesia_running(), + ok = ensure_mnesia_dir(), + ok = init_db(read_cluster_nodes_config()), + ok = wait_for_tables(), + ok. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + +%% Alter which disk nodes this node is clustered with. This can be a +%% subset of all the disk nodes in the cluster but can (and should) +%% include the node itself if it is to be a disk rather than a ram +%% node. +cluster(ClusterNodes) -> + ok = ensure_mnesia_not_running(), + ok = ensure_mnesia_dir(), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + try + ok = init_db(ClusterNodes), + ok = wait_for_tables(), + ok = create_cluster_nodes_config(ClusterNodes) + after + mnesia:stop() + end, + ok. + +%% return node to its virgin state, where it is not member of any +%% cluster, has no cluster configuration, no local database, and no +%% persisted messages +reset() -> reset(false). +force_reset() -> reset(true). + +%%-------------------------------------------------------------------- + +table_definitions() -> + [{user, [{disc_copies, [node()]}, + {attributes, record_info(fields, user)}]}, + {user_vhost, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_vhost)}, + {index, [virtual_host]}]}, + {vhost, [{disc_copies, [node()]}, + {attributes, record_info(fields, vhost)}]}, + {vhost_realm, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, vhost_realm)}, + {index, [realm]}]}, + {realm, [{disc_copies, [node()]}, + {attributes, record_info(fields, realm)}]}, + {user_realm, [{type, bag}, + {disc_copies, [node()]}, + {attributes, record_info(fields, user_realm)}, + {index, [realm]}]}, + {exclusive_realm_visitor, + [{record_name, realm_visitor}, + {attributes, record_info(fields, realm_visitor)}, + {index, [pid]}]}, + {realm_visitor, [{type, bag}, + {attributes, record_info(fields, realm_visitor)}, + {index, [pid]}]}, + {rabbit_config, [{disc_copies, [node()]}]}, + {listener, [{type, bag}, + {attributes, record_info(fields, listener)}]}, + {binding, [{attributes, record_info(fields, binding)}]}, + {durable_exchanges, [{disc_copies, [node()]}, + {record_name, exchange}, + {attributes, record_info(fields, exchange)}]}, + {exchange, [{attributes, record_info(fields, exchange)}]}, + {durable_queues, [{disc_copies, [node()]}, + {record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}]}, + {amqqueue, [{attributes, record_info(fields, amqqueue)}, + {index, [pid]}]}]. + +table_names() -> + [Tab || {Tab, _} <- table_definitions()]. + +ensure_mnesia_dir() -> + MnesiaDir = mnesia:system_info(directory) ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> ok; + no -> throw({error, mnesia_not_running}) + end. + +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> ok; + yes -> throw({error, mnesia_unexpectedly_running}) + end. + +ensure_schema_integrity() -> + case catch lists:foreach(fun (Tab) -> + mnesia:table_info(Tab, version) + end, + table_names()) of + {'EXIT', Reason} -> throw({error, {schema_integrity_check_failed, + Reason}}); + _ -> ok + end, + %%TODO: more thorough checks + ok. + +%% The cluster node config file contains some or all of the disk nodes +%% that are members of the cluster this node is / should be a part of. +%% +%% If the file is absent, the list is empty, or only contains the +%% current node, then the current node is a standalone (disk) +%% node. Otherwise it is a node that is part of a cluster as either a +%% disk node, if it appears in the cluster node config, or ram node if +%% it doesn't. + +cluster_nodes_config_filename() -> + mnesia:system_info(directory) ++ "/cluster_nodes.config". + +create_cluster_nodes_config(ClusterNodes) -> + FileName = cluster_nodes_config_filename(), + Handle = case file:open(FileName, [write]) of + {ok, Device} -> Device; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end, + try + ok = io:write(Handle, ClusterNodes), + ok = io:put_chars(Handle, [$.]) + after + case file:close(Handle) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_close_cluster_nodes_config, + FileName, Reason1}}) + end + end, + ok. + +read_cluster_nodes_config() -> + FileName = cluster_nodes_config_filename(), + case file:consult(FileName) of + {ok, [ClusterNodes]} -> ClusterNodes; + {error, enoent} -> + case application:get_env(cluster_config) of + undefined -> []; + {ok, DefaultFileName} -> + case file:consult(DefaultFileName) of + {ok, [ClusterNodes]} -> ClusterNodes; + {error, enoent} -> + error_logger:warning_msg( + "default cluster config file ~p does not exist~n", + [DefaultFileName]), + []; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_config, + DefaultFileName, Reason}}) + end + end; + {error, Reason} -> + throw({error, {cannot_read_cluster_nodes_config, + FileName, Reason}}) + end. + +delete_cluster_nodes_config() -> + FileName = cluster_nodes_config_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> + throw({error, {cannot_delete_cluster_nodes_config, + FileName, Reason}}) + end. + +%% Take a cluster node config and create the right kind of node - a +%% standalone disk node, or disk or ram node connected to the +%% specified cluster nodes. +init_db(ClusterNodes) -> + WasDiskNode = mnesia:system_info(use_dir), + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of + {ok, []} -> + if WasDiskNode and IsDiskNode -> + ok; + WasDiskNode -> + throw({error, {cannot_convert_disk_node_to_ram_node, + ClusterNodes}}); + IsDiskNode -> + ok = create_schema(); + true -> + throw({error, {unable_to_contact_cluster_nodes, + ClusterNodes}}) + end; + {ok, [_|_]} -> + ok = ensure_schema_integrity(), + ok = wait_for_tables(), + ok = create_local_table_copies( + case IsDiskNode of + true -> disc; + false -> ram + end); + {error, Reason} -> + %% one reason we may end up here is if we try to join + %% nodes together that are currently running standalone or + %% are members of a different cluster + throw({error, {unable_to_join_cluster, + ClusterNodes, Reason}}) + end. + +create_schema() -> + mnesia:stop(), + rabbit_misc:ensure_ok(mnesia:create_schema([node()]), + cannot_create_schema), + rabbit_misc:ensure_ok(mnesia:start(), + cannot_start_mnesia), + create_tables(). + +create_tables() -> + lists:foreach(fun ({Tab, TabArgs}) -> + case mnesia:create_table(Tab, TabArgs) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabArgs, Reason}}) + end + end, + table_definitions()), + ok. + +create_local_table_copies(Type) -> + ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); + true -> ok + end, + lists:foreach( + fun({Tab, TabDef}) -> + HasDiscCopies = + lists:keymember(disc_copies, 1, TabDef), + HasDiscOnlyCopies = + lists:keymember(disc_only_copies, 1, TabDef), + StorageType = + case Type of + disc -> + if + HasDiscCopies -> disc_copies; + HasDiscOnlyCopies -> disc_only_copies; + true -> ram_copies + end; +%% unused code - commented out to keep dialyzer happy +%% disc_only -> +%% if +%% HasDiscCopies or HasDiscOnlyCopies -> +%% disc_only_copies; +%% true -> ram_copies +%% end; + ram -> + ram_copies + end, + ok = create_local_table_copy(Tab, StorageType) + end, + table_definitions()), + ok = if Type == ram -> create_local_table_copy(schema, ram_copies); + true -> ok + end, + ok. + +create_local_table_copy(Tab, Type) -> + StorageType = mnesia:table_info(Tab, storage_type), + {atomic, ok} = + if + StorageType == unknown -> + mnesia:add_table_copy(Tab, node(), Type); + StorageType /= Type -> + mnesia:change_table_copy_type(Tab, node(), Type); + true -> {atomic, ok} + end, + ok. + +wait_for_tables() -> + case mnesia:wait_for_tables(table_names(), 30000) of + ok -> ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +reset(Force) -> + ok = ensure_mnesia_not_running(), + Node = node(), + case Force of + true -> ok; + false -> + ok = ensure_mnesia_dir(), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + {Nodes, RunningNodes} = + try + ok = init(), + {mnesia:system_info(db_nodes) -- [Node], + mnesia:system_info(running_db_nodes) -- [Node]} + after + mnesia:stop() + end, + leave_cluster(Nodes, RunningNodes), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema) + end, + ok = delete_cluster_nodes_config(), + %% remove persistet messages and any other garbage we find + lists:foreach(fun file:delete/1, + filelib:wildcard(mnesia:system_info(directory) ++ "/*")), + ok. + +leave_cluster([], _) -> ok; +leave_cluster(Nodes, RunningNodes) -> + %% find at least one running cluster node and instruct it to + %% remove our schema copy which will in turn result in our node + %% being removed as a cluster node from the schema, with that + %% change being propagated to all nodes + case lists:any( + fun (Node) -> + case rpc:call(Node, mnesia, del_table_copy, + [schema, node()]) of + {atomic, ok} -> true; + {badrpc, nodedown} -> false; + {aborted, Reason} -> + throw({error, {failed_to_leave_cluster, + Nodes, RunningNodes, Reason}}) + end + end, + RunningNodes) of + true -> ok; + false -> throw({error, {no_running_cluster_nodes, + Nodes, RunningNodes}}) + end. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl new file mode 100644 index 0000000000..cd92f1ac4a --- /dev/null +++ b/src/rabbit_multi.erl @@ -0,0 +1,279 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_multi). +-include("rabbit.hrl"). + +-export([start/0, stop/0]). + +-define(RPC_SLEEP, 500). + +start() -> + RpcTimeout = + case init:get_argument(maxwait) of + {ok,[[N1]]} -> 1000 * list_to_integer(N1); + _ -> 30000 + end, + case init:get_plain_arguments() of + [] -> + usage(); + FullCommand -> + {Command, Args} = parse_args(FullCommand), + case catch action(Command, Args, RpcTimeout) of + ok -> + io:format("done.~n"), + init:stop(); + {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> + io:format("Invalid command ~p~n", [FullCommand]), + usage(); + timeout -> + io:format("timeout starting some nodes.~n"), + halt(1); + Other -> + io:format("~nrabbit_multi action ~p failed:~n~p~n", + [Command, Other]), + halt(2) + end + end. + +parse_args([Command | Args]) -> + {list_to_atom(Command), Args}. + +stop() -> + ok. + +usage() -> + io:format("Usage: rabbitmq-multi <command> + +Available commands: + + start_all <NodeCount> - start a local cluster of RabbitMQ nodes. + stop_all - stops all local RabbitMQ nodes. +"), + halt(3). + +action(start_all, [NodeCount], RpcTimeout) -> + io:format("Starting all nodes...~n", []), + N = list_to_integer(NodeCount), + {NodePids, Running} = start_nodes(N, N, [], true, + getenv("NODENAME"), + getenv("NODE_PORT"), + RpcTimeout), + write_pids_file(NodePids), + case Running of + true -> ok; + false -> timeout + end; + +action(stop_all, [], RpcTimeout) -> + io:format("Stopping all nodes...~n", []), + case read_pids_file() of + [] -> throw(no_nodes_running); + NodePids -> stop_nodes(NodePids, RpcTimeout), + delete_pids_file() + end. + +%% PNodePid is the list of PIDs +%% Running is a boolean exhibiting success at some moment +start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; + +start_nodes(N, Total, PNodePid, Running, + NodeNameBase, NodePortBase, RpcTimeout) -> + NodeNumber = Total - N, + NodeName = if NodeNumber == 0 -> + %% For compatibility with running a single node + NodeNameBase; + true -> + NodeNameBase ++ "_" ++ integer_to_list(NodeNumber) + end, + {NodePid, Started} = start_node(NodeName, + list_to_integer(NodePortBase) + NodeNumber, + RpcTimeout), + start_nodes(N - 1, Total, [NodePid | PNodePid], + Started and Running, + NodeNameBase, NodePortBase, RpcTimeout). + +start_node(NodeName, NodePort, RpcTimeout) -> + os:putenv("NODENAME", NodeName), + os:putenv("NODE_PORT", integer_to_list(NodePort)), + Node = rabbit_misc:localnode(list_to_atom(NodeName)), + io:format("Starting node ~s...~n", [Node]), + case rpc:call(Node, os, getpid, []) of + {badrpc, _} -> + Port = run_cmd(script_filename()), + Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), + Pid = case rpc:call(Node, os, getpid, []) of + {badrpc, _} -> throw(cannot_get_pid); + PidS -> list_to_integer(PidS) + end, + io:format("~s~n", [case Started of + true -> "OK"; + false -> "timeout" + end]), + {{Node, Pid}, Started}; + PidS -> + Pid = list_to_integer(PidS), + throw({node_already_running, Node, Pid}) + end. + +wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> + false; +wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> + case parse_status(rpc:call(Node, rabbit, status, [])) of + true -> true; + false -> receive + {'EXIT', Port, PosixCode} -> + throw({node_start_failed, PosixCode}) + after ?RPC_SLEEP -> + wait_for_rabbit_to_start( + Node, RpcTimeout - ?RPC_SLEEP, Port) + end + end. + +run_cmd(FullPath) -> + erlang:open_port({spawn, FullPath}, [nouse_stdio]). + +parse_status({badrpc, _}) -> + false; + +parse_status(Status) -> + case lists:keysearch(running_applications, 1, Status) of + {value, {running_applications, Apps}} -> + lists:keymember(rabbit, 1, Apps); + _ -> + false + end. + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case lists:keysearch(OsFamily, 1, Handlers) of + {value, {_, Handler}} -> Handler(); + false -> throw({unsupported_os, OsFamily}) + end. + +script_filename() -> + ScriptHome = getenv("SCRIPT_HOME"), + ScriptName = with_os( + [{unix , fun () -> "rabbitmq-server" end}, + {win32, fun () -> "rabbitmq-server.bat" end}]), + ScriptHome ++ "/" ++ ScriptName ++ " -noinput". + +pids_file() -> getenv("PIDS_FILE"). + +write_pids_file(Pids) -> + FileName = pids_file(), + Handle = case file:open(FileName, [write]) of + {ok, Device} -> + Device; + {error, Reason} -> + throw({error, {cannot_create_pids_file, + FileName, Reason}}) + end, + try + ok = io:write(Handle, Pids), + ok = io:put_chars(Handle, [$.]) + after + case file:close(Handle) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_create_pids_file, + FileName, Reason1}}) + end + end, + ok. + +delete_pids_file() -> + FileName = pids_file(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_pids_file, + FileName, Reason}}) + end. + +read_pids_file() -> + FileName = pids_file(), + case file:consult(FileName) of + {ok, [Pids]} -> Pids; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_pids_file, + FileName, Reason}}) + end. + +stop_nodes([],_) -> ok; + +stop_nodes([NodePid | Rest], RpcTimeout) -> + stop_node(NodePid, RpcTimeout), + stop_nodes(Rest, RpcTimeout). + +stop_node({Node, Pid}, RpcTimeout) -> + io:format("Stopping node ~p~n", [Node]), + rpc:call(Node, rabbit, stop_and_halt, []), + case kill_wait(Pid, RpcTimeout, false) of + false -> kill_wait(Pid, RpcTimeout, true); + true -> ok + end, + io:format("OK~n", []). + +kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 -> + Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9"; + true -> "kill" + end + end}, + %% Kill forcefully always on Windows, since erl.exe + %% seems to completely ignore non-forceful killing + %% even when everything is working + {win32, fun () -> "taskkill /f /pid" end}]), + os:cmd(Cmd ++ " " ++ integer_to_list(Pid)), + false; % Don't assume what we did just worked! + +% Returns true if the process is dead, false otherwise. +kill_wait(Pid, TimeLeft, Forceful) -> + timer:sleep(?RPC_SLEEP), + io:format(".", []), + is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). + +% Test using some OS clunkiness since we shouldn't trust +% rpc:call(os, getpid, []) at this point +is_dead(Pid) -> + PidS = integer_to_list(Pid), + with_os([{unix, fun () -> + Res = os:cmd("ps --no-headers --pid " ++ PidS), + Res == "" + end}, + {win32, fun () -> + Res = os:cmd("tasklist /nh /fi \"pid eq " ++ + PidS ++ "\""), + case regexp:first_match(Res, "erl.exe") of + {match, _, _} -> false; + _ -> true + end + end}]). + +getenv(Var) -> + case os:getenv(Var) of + false -> throw({missing_env_var, Var}); + Value -> Value + end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl new file mode 100644 index 0000000000..79c927cb9e --- /dev/null +++ b/src/rabbit_networking.erl @@ -0,0 +1,151 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_networking). + +-export([start/0, start_tcp_listener/2, stop_tcp_listener/2, + on_node_down/1, active_listeners/0, node_listeners/1]). +%%used by TCP-based transports, e.g. STOMP adapter +-export([check_tcp_listener_address/3]). + +-export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]). + +-include("rabbit.hrl"). +-include_lib("kernel/include/inet.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(host() :: ip_address() | string() | atom()). + +-spec(start/0 :: () -> 'ok'). +-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(active_listeners/0 :: () -> [listener()]). +-spec(node_listeners/1 :: (node()) -> [listener()]). +-spec(on_node_down/1 :: (node()) -> 'ok'). +-spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) -> + {ip_address(), atom()}). + +-endif. + +%%---------------------------------------------------------------------------- + +start() -> + {ok,_} = supervisor:start_child( + rabbit_sup, + {rabbit_tcp_client_sup, + {tcp_client_sup, start_link, + [{local, rabbit_tcp_client_sup}, + {rabbit_reader,start_link,[]}]}, + transient, infinity, supervisor, [tcp_client_sup]}), + ok. + +check_tcp_listener_address(NamePrefix, Host, Port) -> + IPAddress = + case inet:getaddr(Host, inet) of + {ok, IPAddress1} -> IPAddress1; + {error, Reason} -> + error_logger:error_msg("invalid host ~p - ~p~n", + [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}) + end, + if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; + true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", + [Port]), + throw({error, invalid_port, Port}) + end, + Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {IPAddress, Name}. + +start_tcp_listener(Host, Port) -> + {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), + {ok,_} = supervisor:start_child( + rabbit_sup, + {Name, + {tcp_listener_sup, start_link, + [IPAddress, Port, + [binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, + {exit_on_close, false}], + {?MODULE, tcp_listener_started, []}, + {?MODULE, tcp_listener_stopped, []}, + {?MODULE, start_client, []}]}, + transient, infinity, supervisor, [tcp_listener_sup]}), + ok. + +stop_tcp_listener(Host, Port) -> + {ok, IPAddress} = inet:getaddr(Host, inet), + Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), + ok = supervisor:terminate_child(rabbit_sup, Name), + ok = supervisor:delete_child(rabbit_sup, Name), + ok. + +tcp_listener_started(IPAddress, Port) -> + ok = mnesia:dirty_write( + #listener{node = node(), + protocol = tcp, + host = tcp_host(IPAddress), + port = Port}). + +tcp_listener_stopped(IPAddress, Port) -> + ok = mnesia:dirty_delete_object( + #listener{node = node(), + protocol = tcp, + host = tcp_host(IPAddress), + port = Port}). + +active_listeners() -> + rabbit_misc:dirty_read_all(listener). + +node_listeners(Node) -> + mnesia:dirty_read(listener, Node). + +on_node_down(Node) -> + ok = mnesia:dirty_delete(listener, Node). + +start_client(Sock) -> + {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), + ok = gen_tcp:controlling_process(Sock, Child), + Child ! {go, Sock}, + Child. + +%%-------------------------------------------------------------------- + +tcp_host({0,0,0,0}) -> + {ok, Hostname} = inet:gethostname(), + case inet:gethostbyname(Hostname) of + {ok, #hostent{h_name = Name}} -> Name; + {error, _Reason} -> Hostname + end; +tcp_host(IPAddress) -> + case inet:gethostbyaddr(IPAddress) of + {ok, #hostent{h_name = Name}} -> Name; + {error, _Reason} -> inet_parse:ntoa(IPAddress) + end. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl new file mode 100644 index 0000000000..beef528566 --- /dev/null +++ b/src/rabbit_node_monitor.erl @@ -0,0 +1,76 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_node_monitor). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- + +init([]) -> + ok = net_kernel:monitor_nodes(true), + {ok, no_state}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({nodeup, Node}, State) -> + rabbit_log:info("node ~p up", [Node]), + {noreply, State}; +handle_info({nodedown, Node}, State) -> + rabbit_log:info("node ~p down", [Node]), + %% TODO: This may turn out to be a performance hog when there are + %% lots of nodes. We really only need to execute this code on + %% *one* node, rather than all of them. + ok = rabbit_networking:on_node_down(Node), + ok = rabbit_realm:on_node_down(Node), + ok = rabbit_amqqueue:on_node_down(Node), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- + diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl new file mode 100644 index 0000000000..91016d9d63 --- /dev/null +++ b/src/rabbit_persister.erl @@ -0,0 +1,511 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_persister). + +-behaviour(gen_server). + +-export([start_link/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([transaction/1, extend_transaction/2, dirty_work/1, + commit_transaction/1, rollback_transaction/1, + force_snapshot/0, serial/0]). + +-include("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +-define(LOG_BUNDLE_DELAY, 5). +-define(COMPLETE_BUNDLE_DELAY, 2). + +-define(MAX_WRAP_ENTRIES, 500). + +-define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}). + +-record(pstate, {log_handle, entry_count, deadline, + pending_logs, pending_replies, + snapshot}). + +%% two tables for efficient persistency +%% one maps a key to a message +%% the other maps a key to one or more queues. +%% The aim is to reduce the overload of storing a message multiple times +%% when it appears in several queues. +-record(psnapshot, {serial, transactions, messages, queues}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(qmsg() :: {amqqueue(), pkey()}). +-type(work_item() :: + {publish, message(), qmsg()} | + {deliver, qmsg()} | + {ack, qmsg()}). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(transaction/1 :: ([work_item()]) -> 'ok'). +-spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok'). +-spec(dirty_work/1 :: ([work_item()]) -> 'ok'). +-spec(commit_transaction/1 :: (txn()) -> 'ok'). +-spec(rollback_transaction/1 :: (txn()) -> 'ok'). +-spec(force_snapshot/0 :: () -> 'ok'). +-spec(serial/0 :: () -> non_neg_integer()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +transaction(MessageList) -> + ?LOGDEBUG("transaction ~p~n", [MessageList]), + TxnKey = rabbit_misc:guid(), + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + +extend_transaction(TxnKey, MessageList) -> + ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), + gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). + +dirty_work(MessageList) -> + ?LOGDEBUG("dirty_work ~p~n", [MessageList]), + gen_server:cast(?SERVER, {dirty_work, MessageList}). + +commit_transaction(TxnKey) -> + ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), + gen_server:call(?SERVER, {commit_transaction, TxnKey}). + +rollback_transaction(TxnKey) -> + ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), + gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). + +force_snapshot() -> + gen_server:call(?SERVER, force_snapshot). + +serial() -> + gen_server:call(?SERVER, serial). + +%%-------------------------------------------------------------------- + +init(_Args) -> + process_flag(trap_exit, true), + FileName = base_filename(), + ok = filelib:ensure_dir(FileName), + Snapshot = #psnapshot{serial = 0, + transactions = dict:new(), + messages = ets:new(messages, []), + queues = ets:new(queues, [])}, + LogHandle = + case disk_log:open([{name, rabbit_persister}, + {head, current_snapshot(Snapshot)}, + {file, FileName}]) of + {ok, LH} -> LH; + {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> + WarningFun = if + Bad > 0 -> fun rabbit_log:warning/2; + true -> fun rabbit_log:info/2 + end, + WarningFun("Repaired persister log - ~p recovered, ~p bad~n", + [Recovered, Bad]), + LH + end, + {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot), + NewSnapshot = LoadedSnapshot#psnapshot{ + serial = LoadedSnapshot#psnapshot.serial + 1}, + case Res of + ok -> + ok = take_snapshot(LogHandle, NewSnapshot); + {error, Reason} -> + rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), + ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) + end, + State = #pstate{log_handle = LogHandle, + entry_count = 0, + deadline = infinity, + pending_logs = [], + pending_replies = [], + snapshot = NewSnapshot}, + {ok, State}. + +handle_call({transaction, Key, MessageList}, From, State) -> + NewState = internal_extend(Key, MessageList, State), + do_noreply(internal_commit(From, Key, NewState)); +handle_call({commit_transaction, TxnKey}, From, State) -> + do_noreply(internal_commit(From, TxnKey, State)); +handle_call(force_snapshot, _From, State = #pstate{log_handle = LH, + snapshot = Snapshot}) -> + ok = take_snapshot(LH, Snapshot), + do_reply(ok, State); +handle_call(serial, _From, + State = #pstate{snapshot = #psnapshot{serial = Serial}}) -> + do_reply(Serial, State); +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast({rollback_transaction, TxnKey}, State) -> + do_noreply(internal_rollback(TxnKey, State)); +handle_cast({dirty_work, MessageList}, State) -> + do_noreply(internal_dirty_work(MessageList, State)); +handle_cast({extend_transaction, TxnKey, MessageList}, State) -> + do_noreply(internal_extend(TxnKey, MessageList, State)); +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(timeout, State) -> + {noreply, flush(State)}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> + flush(State), + disk_log:close(LogHandle), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, flush(State)}. + +%%-------------------------------------------------------------------- + +internal_extend(Key, MessageList, State) -> + log_work(fun (ML) -> {extend_transaction, Key, ML} end, + MessageList, State). + +internal_dirty_work(MessageList, State) -> + log_work(fun (ML) -> {dirty_work, ML} end, + MessageList, State). + +internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> + Unit = {commit_transaction, Key}, + NewSnapshot = internal_integrate1(Unit, Snapshot), + complete(From, Unit, State#pstate{snapshot = NewSnapshot}). + +internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> + Unit = {rollback_transaction, Key}, + NewSnapshot = internal_integrate1(Unit, Snapshot), + log(State#pstate{snapshot = NewSnapshot}, Unit). + +complete(From, Item, State = #pstate{deadline = ExistingDeadline, + pending_logs = Logs, + pending_replies = Waiting}) -> + State#pstate{deadline = compute_deadline( + ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), + pending_logs = [Item | Logs], + pending_replies = [From | Waiting]}. + +%% This is made to limit disk usage by writing messages only once onto +%% disk. We keep a table associating pkeys to messages, and provided +%% the list of messages to output is left to right, we can guarantee +%% that pkeys will be a backreference to a message in memory when a +%% "tied" is met. +log_work(CreateWorkUnit, MessageList, + State = #pstate{ + snapshot = Snapshot = #psnapshot{ + messages = Messages}}) -> + Unit = CreateWorkUnit( + rabbit_misc:map_in_order( + fun(M = {publish, Message, QK = {_QName, PKey}}) -> + case ets:lookup(Messages, PKey) of + [_] -> {tied, QK}; + [] -> ets:insert(Messages, {PKey, Message}), + M + end; + (M) -> M + end, + MessageList)), + NewSnapshot = internal_integrate1(Unit, Snapshot), + log(State#pstate{snapshot = NewSnapshot}, Unit). + +log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, + Message) -> + State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, + ExistingDeadline), + pending_logs = [Message | Logs]}. + +base_filename() -> + mnesia:system_info(directory) ++ "/rabbit_persister.LOG". + +take_snapshot(LogHandle, OldFileName, Snapshot) -> + ok = disk_log:sync(LogHandle), + %% current_snapshot is the Head (ie. first thing logged) + ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). + +take_snapshot(LogHandle, Snapshot) -> + OldFileName = lists:flatten(base_filename() ++ ".previous"), + file:delete(OldFileName), + rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), + ok = take_snapshot(LogHandle, OldFileName, Snapshot). + +take_snapshot_and_save_old(LogHandle, Snapshot) -> + {MegaSecs, Secs, MicroSecs} = erlang:now(), + Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, + OldFileName = lists:flatten(io_lib:format("~s.saved.~p", + [base_filename(), Timestamp])), + rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), + ok = take_snapshot(LogHandle, OldFileName, Snapshot). + +maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH, + snapshot = Snapshot}) + when EntryCount >= ?MAX_WRAP_ENTRIES -> + ok = take_snapshot(LH, Snapshot), + State#pstate{entry_count = 0}; +maybe_take_snapshot(State) -> + State. + +later_ms(DeltaMilliSec) -> + {MegaSec, Sec, MicroSec} = now(), + %% Note: not normalised. Unimportant for this application. + {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. + +%% Result = B - A, more or less +time_diff({B1, B2, B3}, {A1, A2, A3}) -> + (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . + +compute_deadline(TimerDelay, infinity) -> + later_ms(TimerDelay); +compute_deadline(_TimerDelay, ExistingDeadline) -> + ExistingDeadline. + +compute_timeout(infinity) -> + infinity; +compute_timeout(Deadline) -> + DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, + if + DeltaMilliSec =< 1 -> + 0; + true -> + round(DeltaMilliSec) + end. + +do_noreply(State = #pstate{deadline = Deadline}) -> + {noreply, State, compute_timeout(Deadline)}. + +do_reply(Reply, State = #pstate{deadline = Deadline}) -> + {reply, Reply, State, compute_timeout(Deadline)}. + +flush(State = #pstate{pending_logs = PendingLogs, + pending_replies = Waiting, + log_handle = LogHandle}) -> + State1 = if + PendingLogs /= [] -> + disk_log:alog(LogHandle, lists:reverse(PendingLogs)), + maybe_take_snapshot( + State#pstate{ + entry_count = State#pstate.entry_count + 1}); + true -> + State + end, + if Waiting /= [] -> + ok = disk_log:sync(LogHandle), + lists:foreach(fun (From) -> gen_server:reply(From, ok) end, + Waiting); + true -> + ok + end, + State1#pstate{deadline = infinity, + pending_logs = [], + pending_replies = []}. + +current_snapshot(_Snapshot = #psnapshot{serial = Serial, + transactions= Ts, + messages = Messages, + queues = Queues}) -> + %% Avoid infinite growth of the table by removing messages not + %% bound to a queue anymore + prune_table(Messages, ets:foldl( + fun ({{_QName, PKey}, _Delivered}, S) -> + sets:add_element(PKey, S) + end, sets:new(), Queues)), + InnerSnapshot = {{serial, Serial}, + {txns, Ts}, + {messages, ets:tab2list(Messages)}, + {queues, ets:tab2list(Queues)}}, + ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), + {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, + term_to_binary(InnerSnapshot)}. + +prune_table(Tab, Keys) -> + true = ets:safe_fixtable(Tab, true), + ok = prune_table(Tab, Keys, ets:first(Tab)), + true = ets:safe_fixtable(Tab, false). + +prune_table(_Tab, _Keys, '$end_of_table') -> ok; +prune_table(Tab, Keys, Key) -> + case sets:is_element(Key, Keys) of + true -> ok; + false -> ets:delete(Tab, Key) + end, + prune_table(Tab, Keys, ets:next(Tab, Key)). + +internal_load_snapshot(LogHandle, + Snapshot = #psnapshot{messages = Messages, + queues = Queues}) -> + {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), + case check_version(Loaded_Snapshot) of + {ok, StateBin} -> + {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} = + binary_to_term(StateBin), + true = ets:insert(Messages, Ms), + true = ets:insert(Queues, Qs), + Snapshot1 = replay(Items, LogHandle, K, + Snapshot#psnapshot{ + serial = Serial, + transactions = Ts}), + Snapshot2 = requeue_messages(Snapshot1), + %% uncompleted transactions are discarded - this is TRTTD + %% since we only get into this code on node restart, so + %% any uncompleted transactions will have been aborted. + {ok, Snapshot2#psnapshot{transactions = dict:new()}}; + {error, Reason} -> {{error, Reason}, Snapshot} + end. + +check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, + StateBin}) -> + {ok, StateBin}; +check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> + {error, {unsupported_persister_log_format, Vsn}}; +check_version(_Other) -> + {error, unrecognised_persister_log_format}. + +requeue_messages(Snapshot = #psnapshot{messages = Messages, + queues = Queues}) -> + Work = ets:foldl(fun accumulate_requeues/2, dict:new(), Queues), + %% unstable parallel map, because order doesn't matter + L = lists:append( + rabbit_misc:upmap( + %% we do as much work as possible in spawned worker + %% processes, but we need to make sure the ets:inserts are + %% performed in self() + fun ({QName, Requeues}) -> + requeue(QName, Requeues, Messages) + end, dict:to_list(Work))), + NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L], + NewQueues = [{QK, D} || {QK, _M, D} <- L], + ets:delete_all_objects(Messages), + ets:delete_all_objects(Queues), + true = ets:insert(Messages, NewMessages), + true = ets:insert(Queues, NewQueues), + %% contains the mutated messages and queues tables + Snapshot. + +accumulate_requeues({{QName, PKey}, Delivered}, Acc) -> + Requeue = {PKey, Delivered}, + dict:update(QName, + fun (Requeues) -> [Requeue | Requeues] end, + [Requeue], + Acc). + +requeue(QName, Requeues, Messages) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{pid = QPid}} -> + RequeueMessages = + [{{QName, PKey}, Message, Delivered} || + {PKey, Delivered} <- Requeues, + {_, Message} <- ets:lookup(Messages, PKey)], + rabbit_amqqueue:redeliver( + QPid, + %% Messages published by the same process receive + %% persistence keys that are monotonically + %% increasing. Since message ordering is defined on a + %% per-channel basis, and channels are bound to specific + %% processes, sorting the list does provide the correct + %% ordering properties. + [{Message, Delivered} || {_, Message, Delivered} <- + lists:sort(RequeueMessages)]), + RequeueMessages; + {error, not_found} -> + [] + end. + +replay([], LogHandle, K, Snapshot) -> + case disk_log:chunk(LogHandle, K) of + {K1, Items} -> + replay(Items, LogHandle, K1, Snapshot); + {K1, Items, Badbytes} -> + rabbit_log:warning("~p bad bytes recovering persister log~n", + [Badbytes]), + replay(Items, LogHandle, K1, Snapshot); + eof -> Snapshot + end; +replay([Item | Items], LogHandle, K, Snapshot) -> + NewSnapshot = internal_integrate_messages(Item, Snapshot), + replay(Items, LogHandle, K, NewSnapshot). + +internal_integrate_messages(Items, Snapshot) -> + lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, + Snapshot, Items). + +internal_integrate1({extend_transaction, Key, MessageList}, + Snapshot = #psnapshot {transactions = Transactions}) -> + NewTransactions = + dict:update(Key, + fun (MessageLists) -> [MessageList | MessageLists] end, + [MessageList], + Transactions), + Snapshot#psnapshot{transactions = NewTransactions}; +internal_integrate1({rollback_transaction, Key}, + Snapshot = #psnapshot{transactions = Transactions}) -> + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; +internal_integrate1({commit_transaction, Key}, + Snapshot = #psnapshot{transactions = Transactions, + messages = Messages, + queues = Queues}) -> + case dict:find(Key, Transactions) of + {ok, MessageLists} -> + ?LOGDEBUG("persist committing txn ~p~n", [Key]), + lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end, + lists:reverse(MessageLists)), + Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; + error -> + Snapshot + end; +internal_integrate1({dirty_work, MessageList}, + Snapshot = #psnapshot {messages = Messages, + queues = Queues}) -> + perform_work(MessageList, Messages, Queues), + Snapshot. + +perform_work(MessageList, Messages, Queues) -> + lists:foreach( + fun (Item) -> perform_work_item(Item, Messages, Queues) end, + MessageList). + +perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) -> + ets:insert(Messages, {PKey, Message}), + ets:insert(Queues, {QK, false}); + +perform_work_item({tied, QK}, _Messages, Queues) -> + ets:insert(Queues, {QK, false}); + +perform_work_item({deliver, QK}, _Messages, Queues) -> + %% from R12B-2 onward we could use ets:update_element/3 here + ets:delete(Queues, QK), + ets:insert(Queues, {QK, true}); + +perform_work_item({ack, QK}, _Messages, Queues) -> + ets:delete(Queues, QK). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl new file mode 100644 index 0000000000..1d11cbaaea --- /dev/null +++ b/src/rabbit_reader.erl @@ -0,0 +1,693 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_reader). +-include("rabbit_framing.hrl"). +-include("rabbit.hrl"). + +-export([start_link/0]). + +-export([system_continue/3, system_terminate/4, system_code_change/4]). + +-export([init/1, mainloop/3]). + +-export([analyze_frame/2]). + +-import(gen_tcp). +-import(fprof). +-import(inet). +-import(prim_inet). + +-define(HANDSHAKE_TIMEOUT, 10). +-define(NORMAL_TIMEOUT, 3). +-define(CLOSING_TIMEOUT, 1). +-define(CHANNEL_CLOSING_TIMEOUT, 1). +-define(CHANNEL_TERMINATION_TIMEOUT, 3). + +%--------------------------------------------------------------------------- + +-record(v1, {sock, connection, callback, recv_ref, connection_state}). + +%% connection lifecycle +%% +%% all state transitions and terminations are marked with *...* +%% +%% The lifecycle begins with: start handshake_timeout timer, *pre-init* +%% +%% all states, unless specified otherwise: +%% socket error -> *exit* +%% socket close -> *throw* +%% forced termination -> *exit* +%% handshake_timeout -> *throw* +%% pre-init: +%% receive protocol header -> send connection.start, *starting* +%% starting: +%% receive connection.start_ok -> send connection.tune, *tuning* +%% tuning: +%% receive connection.tune_ok -> start heartbeats, *opening* +%% opening: +%% receive connection.open -> send connection.open_ok, *running* +%% running: +%% receive connection.close -> +%% tell channels to terminate gracefully +%% if no channels then send connection.close_ok, start +%% terminate_connection timer, *closed* +%% else *closing* +%% forced termination +%% -> wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *exit* +%% channel exit with hard error +%% -> log error, wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *closed* +%% channel exit with soft error +%% -> log error, start terminate_channel timer, mark channel as +%% closing, *running* +%% terminate_channel timeout -> remove 'closing' mark, *running* +%% handshake_timeout -> ignore, *running* +%% heartbeat timeout -> *throw* +%% closing: +%% socket close -> *terminate* +%% receive frame -> ignore, *closing* +%% terminate_channel timeout -> remove 'closing' mark, *closing* +%% handshake_timeout -> ignore, *closing* +%% heartbeat timeout -> *throw* +%% channel exit -> +%% if abnormal exit then log error +%% if last channel to exit then send connection.close_ok, start +%% terminate_connection timer, *closing* +%% closed: +%% socket close -> *terminate* +%% receive connection.close_ok -> self() ! terminate_connection, +%% *closed* +%% receive frame -> ignore, *closed* +%% terminate_connection timeout -> *terminate* +%% terminate_channel timeout -> remove 'closing' mark, *closed* +%% handshake_timeout -> ignore, *closed* +%% heartbeat timeout -> *throw* +%% channel exit -> log error, *closed* +%% +%% +%% TODO: refactor the code so that the above is obvious + +%%-------------------------------------------------------------------------- + +start_link() -> + {ok, proc_lib:spawn_link(?MODULE, init, [self()])}. + +init(Parent) -> + Deb = sys:debug_options([]), + receive + {go, Sock} -> start_connection(Parent, Deb, Sock) + end. + +system_continue(Parent, Deb, State) -> + ?MODULE:mainloop(Parent, Deb, State). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + +setup_profiling() -> + Value = rabbit_misc:get_config(profiling_enabled, false), + case Value of + once -> + rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"), + rabbit_misc:set_config(profiling_enabled, false), + fprof:trace(start); + true -> + rabbit_log:info("Enabling profiling for this connection.~n"), + fprof:trace(start); + false -> + ok + end, + Value. + +teardown_profiling(Value) -> + case Value of + false -> + ok; + _ -> + rabbit_log:info("Completing profiling for this connection.~n"), + fprof:trace(stop), + fprof:profile(), + fprof:analyse([{dest, []}, {cols, 100}]) + end. + +start_connection(Parent, Deb, ClientSock) -> + ProfilingValue = setup_profiling(), + process_flag(trap_exit, true), + {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock), + PeerAddressS = inet_parse:ntoa(PeerAddress), + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + try + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), + handshake_timeout), + mainloop(Parent, Deb, switch_callback( + #v1{sock = ClientSock, + connection = #connection{ + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none}, + callback = uninitialized_callback, + recv_ref = none, + connection_state = pre_init}, + handshake, 8)) + catch + Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n", + [self(), PeerAddressS, PeerPort, Ex]) + after + rabbit_log:info("closing TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + %% We don't close the socket explicitly. The reader is the + %% controlling process and hence its termination will close + %% the socket. Furthermore, gen_tcp:close/1 waits for pending + %% output to be sent, which results in unnecessary delays. + %% + %% gen_tcp:close(ClientSock), + teardown_profiling(ProfilingValue) + end, + done. + +mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> + %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), + receive + {inet_async, Sock, Ref, {ok, Data}} -> + {State1, Callback1, Length1} = + handle_input(State#v1.callback, Data, + State#v1{recv_ref = none}), + mainloop(Parent, Deb, + switch_callback(State1, Callback1, Length1)); + {inet_async, Sock, Ref, {error, closed}} -> + if State#v1.connection_state =:= closed -> + State; + true -> + throw(connection_closed_abruptly) + end; + {inet_async, Sock, Ref, {error, Reason}} -> + throw({inet_error, Reason}); + {'EXIT', Parent, Reason} -> + if State#v1.connection_state =:= running -> + send_exception( + State, 0, + {amqp, connection_forced, + io_lib:format( + "broker forced connection closure with reason '~w'", + [Reason]), none}); + true -> ok + end, + %% this is what we are expected to do according to + %% http://www.erlang.org/doc/man/sys.html + %% + %% If we wanted to be *really* nice we should wait for a + %% while for clients to close the socket at their end, + %% just as we do in the ordinary error case. However, + %% since this termination is initiated by our parent it is + %% probably more important to exit quickly. + exit(Reason); + {'EXIT', Pid, Reason} -> + mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); + {terminate_channel, Channel, Ref1} -> + mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); + terminate_connection -> + State; + handshake_timeout -> + if State#v1.connection_state =:= running orelse + State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed -> + mainloop(Parent, Deb, State); + true -> + throw({handshake_timeout, State#v1.callback}) + end; + timeout -> + throw({timeout, State#v1.connection_state}); + {system, From, Request} -> + sys:handle_system_msg(Request, From, + Parent, ?MODULE, Deb, State); + Other -> + %% internal error -> something worth dying for + exit({unexpected_message, Other}) + end. + +switch_callback(OldState, NewCallback, Length) -> + {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1), + OldState#v1{callback = NewCallback, + recv_ref = Ref}. + +close_connection(State = #v1{connection = #connection{ + timeout_sec = TimeoutSec}}) -> + %% We terminate the connection after the specified interval, but + %% no later than ?CLOSING_TIMEOUT seconds. + TimeoutMillisec = + 1000 * if TimeoutSec > 0 andalso + TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; + true -> ?CLOSING_TIMEOUT + end, + erlang:send_after(TimeoutMillisec, self(), terminate_connection), + State#v1{connection_state = closed}. + +close_channel(Channel, State) -> + Ref = make_ref(), + TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, + self(), + {terminate_channel, Channel, Ref}), + put({closing_channel, Channel}, {Ref, TRef}), + State. + +terminate_channel(Channel, Ref, State) -> + case get({closing_channel, Channel}) of + undefined -> ok; %% got close_ok in the meantime + {Ref, _} -> erase({closing_channel, Channel}), + ok; + {_Ref, _} -> ok %% got close_ok, and have new closing channel + end, + State. + +handle_dependent_exit(Pid, Reason, + State = #v1{connection_state = closing}) -> + case channel_cleanup(Pid) of + undefined -> exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + case Reason of + normal -> ok; + _ -> log_channel_error(closing, Channel, Reason) + end, + maybe_close(State) + end; +handle_dependent_exit(Pid, normal, State) -> + channel_cleanup(Pid), + State; +handle_dependent_exit(Pid, Reason, State) -> + case channel_cleanup(Pid) of + undefined -> exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> handle_exception(State, Channel, Reason) + end. + +channel_cleanup(Pid) -> + case get({chpid, Pid}) of + undefined -> + case get({closing_chpid, Pid}) of + undefined -> undefined; + {channel, Channel} -> + erase({closing_chpid, Pid}), + Channel + end; + {channel, Channel} -> + erase({channel, Channel}), + erase({chpid, Pid}), + Channel + end. + +all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. + +terminate_channels() -> + NChannels = length([exit(Pid, normal) || Pid <- all_channels()]), + if NChannels > 0 -> + Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels, + TimerRef = erlang:send_after(Timeout, self(), cancel_wait), + wait_for_channel_termination(NChannels, TimerRef); + true -> ok + end. + +wait_for_channel_termination(0, TimerRef) -> + case erlang:cancel_timer(TimerRef) of + false -> receive + cancel_wait -> ok + end; + _ -> ok + end; + +wait_for_channel_termination(N, TimerRef) -> + receive + {'EXIT', Pid, Reason} -> + case channel_cleanup(Pid) of + undefined -> + exit({abnormal_dependent_exit, Pid, Reason}); + Channel -> + case Reason of + normal -> ok; + _ -> + rabbit_log:error( + "connection ~p, channel ~p - error while terminating:~n~p~n", + [self(), Channel, Reason]) + end, + wait_for_channel_termination(N-1, TimerRef) + end; + cancel_wait -> + exit(channel_termination_timeout) + end. + +maybe_close(State) -> + case all_channels() of + [] -> ok = send_on_channel0( + State#v1.sock, #'connection.close_ok'{}), + close_connection(State); + _ -> State + end. + +handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + case analyze_frame(Type, Payload) of + {method, MethodName, FieldsBin} -> + handle_method0(MethodName, FieldsBin, State); + _Other -> State + end; +handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + State; +handle_frame(Type, 0, Payload, State) -> + case analyze_frame(Type, Payload) of + error -> throw({unknown_frame, Type, Payload}); + heartbeat -> State; + trace -> State; + {method, MethodName, FieldsBin} -> + handle_method0(MethodName, FieldsBin, State); + Other -> throw({unexpected_frame_on_channel0, Other}) + end; +handle_frame(Type, Channel, Payload, State) -> + case analyze_frame(Type, Payload) of + error -> throw({unknown_frame, Type, Payload}); + heartbeat -> throw({unexpected_heartbeat_frame, Channel}); + trace -> throw({unexpected_trace_frame, Channel}); + AnalyzedFrame -> + %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), + case get({channel, Channel}) of + {chpid, ChPid} -> + ok = check_for_close(Channel, ChPid, AnalyzedFrame), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), + State; + undefined -> + case State#v1.connection_state of + running -> send_to_new_channel( + Channel, AnalyzedFrame, State), + State; + Other -> throw({channel_frame_while_starting, + Channel, Other, AnalyzedFrame}) + end + end + end. + +analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> + {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; +analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> + {content_header, ClassId, Weight, BodySize, Properties}; +analyze_frame(?FRAME_BODY, Body) -> + {content_body, Body}; +analyze_frame(?FRAME_TRACE, _Body) -> + trace; +analyze_frame(?FRAME_HEARTBEAT, <<>>) -> + heartbeat; +analyze_frame(_Type, _Body) -> + error. + +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> + %%?LOGDEBUG("Got frame header: ~p/~p/~p~n", [Type, Channel, PayloadSize]), + {State, {frame_payload, Type, Channel, PayloadSize}, PayloadSize + 1}; + +handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> + case PayloadAndMarker of + <<Payload:PayloadSize/binary, ?FRAME_END>> -> + %%?LOGDEBUG("Frame completed: ~p/~p/~p~n", [Type, Channel, Payload]), + NewState = handle_frame(Type, Channel, Payload, State), + {NewState, frame_header, 7}; + _ -> + throw({bad_payload, PayloadAndMarker}) + end; + +handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, + State = #v1{sock = Sock, connection = Connection}) -> + case check_version({ProtocolMajor, ProtocolMinor}, + {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of + true -> + {ok, Product} = application:get_key(id), + {ok, Version} = application:get_key(vsn), + ok = send_on_channel0( + Sock, + #'connection.start'{ + version_major = ?PROTOCOL_VERSION_MAJOR, + version_minor = ?PROTOCOL_VERSION_MINOR, + server_properties = + [{list_to_binary(K), longstr, list_to_binary(V)} || + {K, V} <- + [{"product", Product}, + {"version", Version}, + {"platform", "Erlang/OTP"}, + {"copyright", ?COPYRIGHT_MESSAGE}, + {"information", ?INFORMATION_MESSAGE}]], + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT}, + connection_state = starting}, + frame_header, 7}; + false -> + throw({bad_version, ProtocolMajor, ProtocolMinor}) + end; + +handle_input(handshake, Other, #v1{sock = Sock}) -> + ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>), + throw({bad_header, Other}); + +handle_input(Callback, Data, _State) -> + throw({bad_input, Callback, Data}). + +%% the 0-8 spec, confusingly, defines the version as 8-0 +adjust_version({8,0}) -> {0,8}; +adjust_version(Version) -> Version. +check_version(ClientVersion, ServerVersion) -> + {ClientMajor, ClientMinor} = adjust_version(ClientVersion), + {ServerMajor, ServerMinor} = adjust_version(ServerVersion), + ClientMajor > ServerMajor + orelse + (ClientMajor == ServerMajor andalso + ClientMinor >= ServerMinor). + +%%-------------------------------------------------------------------------- + +handle_method0(MethodName, FieldsBin, State) -> + try + handle_method0(rabbit_framing:decode_method_fields( + MethodName, FieldsBin), + State) + catch exit:Reason -> + CompleteReason = + case Reason of + {amqp, Error, Explanation, none} -> + {amqp, Error, Explanation, MethodName}; + OtherReason -> OtherReason + end, + case State#v1.connection_state of + running -> send_exception(State, 0, CompleteReason); + Other -> throw({channel0_error, Other, CompleteReason}) + end + end. +handle_method0(#'connection.start_ok'{mechanism = Mechanism, + response = Response}, + State = #v1{connection_state = starting, + connection = Connection, + sock = Sock}) -> + User = rabbit_access_control:check_login(Mechanism, Response), + ok = send_on_channel0( + Sock, + #'connection.tune'{channel_max = 0, + %% set to zero once QPid fix their negotiation + frame_max = 131072, + heartbeat = 0}), + State#v1{connection_state = tuning, + connection = Connection#connection{user = User}}; +handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, + frame_max = FrameMax, + heartbeat = ClientHeartbeat}, + State = #v1{connection_state = tuning, + connection = Connection, + sock = Sock}) -> + %% if we have a channel_max limit that the client wishes to + %% exceed, die as per spec. Not currently a problem, so we ignore + %% the client's channel_max parameter. + rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}}; +handle_method0(#'connection.open'{virtual_host = VHostPath, + insist = Insist}, + State = #v1{connection_state = opening, + connection = Connection = #connection{ + user = User}, + sock = Sock}) -> + ok = rabbit_access_control:check_vhost_access(User, VHostPath), + NewConnection = Connection#connection{vhost = VHostPath}, + KnownHosts = format_listeners(rabbit_networking:active_listeners()), + Redirects = compute_redirects(Insist), + if Redirects == [] -> + ok = send_on_channel0( + Sock, + #'connection.open_ok'{known_hosts = KnownHosts}), + State#v1{connection_state = running, + connection = NewConnection}; + true -> + %% FIXME: 'host' is supposed to only contain one + %% address; but which one do we pick? This is + %% really a problem with the spec. + Host = format_listeners(Redirects), + rabbit_log:info("connection ~p redirecting to ~p~n", + [self(), Host]), + ok = send_on_channel0( + Sock, + #'connection.redirect'{host = Host, + known_hosts = KnownHosts}), + close_connection(State#v1{connection = NewConnection}) + end; +handle_method0(#'connection.close'{}, + State = #v1{connection_state = running}) -> + lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), + maybe_close(State#v1{connection_state = closing}); +handle_method0(#'connection.close_ok'{}, + State = #v1{connection_state = closed}) -> + self() ! terminate_connection, + State; +handle_method0(_Method, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + State; +handle_method0(_Method, #v1{connection_state = S}) -> + rabbit_misc:protocol_error( + channel_error, "unexpected method in connection state ~w", [S]). + +send_on_channel0(Sock, Method) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method). + +format_listeners(Listeners) -> + list_to_binary( + rabbit_misc:intersperse( + $,, + [io_lib:format("~s:~w", [Host, Port]) || + #listener{host = Host, port = Port} <- Listeners])). + +compute_redirects(true) -> []; +compute_redirects(false) -> + Node = node(), + LNode = rabbit_load:pick(), + if Node == LNode -> []; + true -> rabbit_networking:node_listeners(LNode) + end. + +%%-------------------------------------------------------------------------- + +send_to_new_channel(Channel, AnalyzedFrame, State) -> + case get({closing_channel, Channel}) of + undefined -> + #v1{sock = Sock, + connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/4, + [self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); + {_, TRef} -> + %% According to the spec, after sending a channel.close we + %% must ignore all frames except channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erlang:cancel_timer(TRef), + erase({closing_channel, Channel}), + ok; + _Other -> ok + end + end. + +check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> + channel_cleanup(ChPid), + put({closing_chpid, ChPid}, {channel, Channel}), + ok; +check_for_close(_Channel, _ChPid, _Frame) -> + ok. + +log_channel_error(ConnectionState, Channel, Reason) -> + rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", + [self(), ConnectionState, Channel, Reason]). + +handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> + log_channel_error(closed, Channel, Reason), + State; +handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> + log_channel_error(CS, Channel, Reason), + send_exception(State, Channel, Reason). + +send_exception(State, Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), + NewState = case ShouldClose of + true -> terminate_channels(), + close_connection(State); + false -> close_channel(Channel, State) + end, + ok = rabbit_writer:internal_send_command( + NewState#v1.sock, CloseChannel, CloseMethod), + NewState. + +map_exception(Channel, Reason) -> + {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = + lookup_amqp_exception(Reason), + ShouldClose = SuggestedClose or (Channel == 0), + {ClassId, MethodId} = case FailedMethod of + {_, _} -> FailedMethod; + none -> {0, 0}; + _ -> rabbit_framing:method_id(FailedMethod) + end, + {CloseChannel, CloseMethod} = + case ShouldClose of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end, + {ShouldClose, CloseChannel, CloseMethod}. + +lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + SafeTextBin = if size(CompleteTextBin) > 255 -> + <<CompleteTextBin:252/binary, "...">>; + true -> + CompleteTextBin + end, + {ShouldClose, Code, SafeTextBin, Method}; +lookup_amqp_exception({amqp, ErrorName, Expl, Method}) -> + Details = rabbit_framing:lookup_amqp_exception(ErrorName), + lookup_amqp_exception({amqp, Details, Expl, Method}); +lookup_amqp_exception(Other) -> + rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), + {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}. diff --git a/src/rabbit_realm.erl b/src/rabbit_realm.erl new file mode 100644 index 0000000000..4463954d7e --- /dev/null +++ b/src/rabbit_realm.erl @@ -0,0 +1,316 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_realm). + +-export([recover/0]). +-export([add_realm/1, delete_realm/1, list_vhost_realms/1]). +-export([add/2, delete/2, check/2, delete_from_all/1]). +-export([access_request/3, enter_realm/3, leave_realms/1]). +-export([on_node_down/1]). + +-include("rabbit.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(e_or_q() :: 'exchange' | 'queue'). + +-spec(recover/0 :: () -> 'ok'). +-spec(add_realm/1 :: (realm_name()) -> 'ok'). +-spec(delete_realm/1 :: (realm_name()) -> 'ok'). +-spec(list_vhost_realms/1 :: (vhost()) -> [name()]). +-spec(add/2 :: (realm_name(), r(e_or_q())) -> 'ok'). +-spec(delete/2 :: (realm_name(), r(e_or_q())) -> 'ok'). +-spec(check/2 :: (realm_name(), r(e_or_q())) -> bool() | not_found()). +-spec(delete_from_all/1 :: (r(e_or_q())) -> 'ok'). +-spec(access_request/3 :: (username(), bool(), ticket()) -> + 'ok' | not_found() | {'error', 'bad_realm_path' | + 'access_refused' | + 'resource_locked'}). +-spec(enter_realm/3 :: (realm_name(), bool(), pid()) -> + 'ok' | {'error', 'resource_locked'}). +-spec(leave_realms/1 :: (pid()) -> 'ok'). +-spec(on_node_down/1 :: (node()) -> 'ok'). + +-endif. + +%%-------------------------------------------------------------------- + +recover() -> + %% preens resource lists, limiting them to currently-extant resources + rabbit_misc:execute_mnesia_transaction( + fun () -> + Realms = mnesia:foldl(fun preen_realm/2, [], realm), + lists:foreach(fun mnesia:write/1, Realms), + ok + end). + +add_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + fun () -> + case mnesia:read({realm, Name}) of + [] -> + NewRealm = #realm{name = Name, + exchanges = ordsets:new(), + queues = ordsets:new()}, + ok = mnesia:write(NewRealm), + ok = mnesia:write( + #vhost_realm{virtual_host = VHostPath, + realm = Name}), + ok; + [_R] -> + mnesia:abort({realm_already_exists, Name}) + end + end)). + +delete_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) -> + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + rabbit_misc:with_realm( + Name, + fun () -> + ok = mnesia:delete({realm, Name}), + ok = mnesia:delete_object( + #vhost_realm{virtual_host = VHostPath, + realm = Name}), + lists:foreach(fun mnesia:delete_object/1, + mnesia:index_read(user_realm, Name, + #user_realm.realm)), + ok + end))). + +list_vhost_realms(VHostPath) -> + [Name || + #vhost_realm{realm = #resource{name = Name}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + rabbit_misc:with_vhost( + VHostPath, + fun () -> mnesia:read({vhost_realm, VHostPath}) end))]. + +add(Name = #resource{kind = realm}, Resource) -> + internal_update_realm_byname(Name, Resource, fun ordsets:add_element/2). + +delete(Name = #resource{kind = realm}, Resource) -> + internal_update_realm_byname(Name, Resource, fun ordsets:del_element/2). + +check(Name = #resource{kind = realm}, Resource = #resource{kind = Kind}) -> + case rabbit_misc:dirty_read({realm, Name}) of + {ok, R} -> + case Kind of + exchange -> ordsets:is_element(Resource, R#realm.exchanges); + queue -> ordsets:is_element(Resource, R#realm.queues) + end; + Other -> Other + end. + +% Requires a mnesia transaction. +delete_from_all(Resource = #resource{kind = Kind}) -> + Realms = mnesia:foldl + (fun (Realm = #realm{exchanges = E0, + queues = Q0}, + Acc) -> + IsMember = lists:member(Resource, + case Kind of + exchange -> E0; + queue -> Q0 + end), + if + IsMember -> + [internal_update_realm_record( + Realm, Resource, + fun ordsets:del_element/2) + | Acc]; + true -> + Acc + end + end, [], realm), + lists:foreach(fun mnesia:write/1, Realms), + ok. + +access_request(Username, Exclusive, Ticket = #ticket{realm_name = RealmName}) + when is_binary(Username) -> + %% FIXME: We should do this all in a single tx. Otherwise we may + %% a) get weird answers, b) create inconsistencies in the db + %% (e.g. realm_visitor records referring to non-existing realms). + case check_and_lookup(RealmName) of + {error, Reason} -> + {error, Reason}; + {ok, _Realm} -> + {ok, U} = rabbit_access_control:lookup_user(Username), + case rabbit_access_control:lookup_realm_access(U, RealmName) of + none -> + {error, access_refused}; + TicketPattern -> + case match_ticket(TicketPattern, Ticket) of + no_match -> + {error, access_refused}; + match -> + enter_realm(RealmName, Exclusive, self()) + end + end + end. + +enter_realm(Name = #resource{kind = realm}, IsExclusive, Pid) -> + RealmVisitor = #realm_visitor{realm = Name, pid = Pid}, + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({exclusive_realm_visitor, Name}) of + [] when IsExclusive -> + ok = mnesia:delete_object(RealmVisitor), + %% TODO: find a more efficient way of checking + %% for "no machting results" that doesn't + %% involve retrieving all the records + case mnesia:read({realm_visitor, Name}) of + [] -> + mnesia:write( + exclusive_realm_visitor, RealmVisitor, write), + ok; + [_|_] -> + {error, resource_locked} + end; + [] -> + ok = mnesia:write(RealmVisitor), + ok; + [RealmVisitor] when IsExclusive -> ok; + [RealmVisitor] -> + ok = mnesia:delete({exclusive_realm_visitor, Name}), + ok = mnesia:write(RealmVisitor), + ok; + [_] -> + {error, resource_locked} + end + end). + +leave_realms(Pid) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:index_read(exclusive_realm_visitor, Pid, + #realm_visitor.pid) of + [] -> ok; + [R] -> + ok = mnesia:delete_object( + exclusive_realm_visitor, R, write) + end, + lists:foreach(fun mnesia:delete_object/1, + mnesia:index_read(realm_visitor, Pid, + #realm_visitor.pid)), + ok + end). + +on_node_down(Node) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach( + fun (T) -> ok = remove_visitors(Node, T) end, + [exclusive_realm_visitor, realm_visitor]), + ok + end). + +%%-------------------------------------------------------------------- + +preen_realm(Realm = #realm{name = #resource{kind = realm}, + exchanges = E0, + queues = Q0}, + Realms) -> + [Realm#realm{exchanges = filter_out_missing(E0, exchange), + queues = filter_out_missing(Q0, amqqueue)} + | Realms]. + +filter_out_missing(Items, TableName) -> + ordsets:filter(fun (Item) -> + case mnesia:read({TableName, Item}) of + [] -> false; + _ -> true + end + end, Items). + +internal_update_realm_byname(Name, Resource, SetUpdater) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({realm, Name}) of + [] -> + mnesia:abort(not_found); + [R] -> + ok = mnesia:write(internal_update_realm_record + (R, Resource, SetUpdater)) + end + end). + +internal_update_realm_record(R = #realm{exchanges = E0, queues = Q0}, + Resource = #resource{kind = Kind}, + SetUpdater) -> + case Kind of + exchange -> R#realm{exchanges = SetUpdater(Resource, E0)}; + queue -> R#realm{queues = SetUpdater(Resource, Q0)} + end. + +check_and_lookup(RealmName = #resource{kind = realm, + name = <<"/data", _/binary>>}) -> + lookup(RealmName); +check_and_lookup(RealmName = #resource{kind = realm, + name = <<"/admin", _/binary>>}) -> + lookup(RealmName); +check_and_lookup(_) -> + {error, bad_realm_path}. + +lookup(Name = #resource{kind = realm}) -> + rabbit_misc:dirty_read({realm, Name}). + +match_ticket(#ticket{passive_flag = PP, + active_flag = PA, + write_flag = PW, + read_flag = PR}, + #ticket{passive_flag = TP, + active_flag = TA, + write_flag = TW, + read_flag = TR}) -> + if + %% Matches if either we're not requesting passive access, or + %% passive access is permitted, and ... + (not(TP) orelse PP) andalso + (not(TA) orelse PA) andalso + (not(TW) orelse PW) andalso + (not(TR) orelse PR) -> + match; + true -> + no_match + end. + +remove_visitors(Node, T) -> + qlc:fold( + fun (R, Acc) -> + ok = mnesia:delete_object(T, R, write), + Acc + end, + ok, + qlc:q([R || R = #realm_visitor{pid = Pid} <- mnesia:table(T), + node(Pid) == Node])). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl new file mode 100644 index 0000000000..41a8d64cb3 --- /dev/null +++ b/src/rabbit_router.erl @@ -0,0 +1,166 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_router). +-include("rabbit.hrl"). + +-behaviour(gen_server). + +-export([start_link/0, + deliver/5]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> + {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +deliver(QPids, Mandatory, Immediate, Txn, Message) -> + %% we reduce inter-node traffic by grouping the qpids by node and + %% only delivering one copy of the message to each node involved, + %% which then in turn delivers it to its queues. + deliver_per_node( + dict:to_list( + lists:foldl( + fun (QPid, D) -> + dict:update(node(QPid), + fun (QPids1) -> [QPid | QPids1] end, + [QPid], D) + end, + dict:new(), QPids)), + Mandatory, Immediate, Txn, Message). + +deliver_per_node([{Node, QPids}], Mandatory, Immediate, + Txn, Message) + when Node == node() -> + %% optimisation + check_delivery(Mandatory, Immediate, + run_bindings(QPids, Mandatory, Immediate, Txn, Message)); +deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, + Txn, Message) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver in run_bindings below will deliver the + %% message to the queue process asynchronously, and return true, + %% which means all the QPids will always be returned. It is + %% therefore safe to use a fire-and-forget cast here and return + %% the QPids - the semantics is preserved. This scales much better + %% than the non-immediate case below. + {ok, lists:flatmap( + fun ({Node, QPids}) -> + gen_server:cast( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}), + QPids + end, + NodeQPids)}; +deliver_per_node(NodeQPids, Mandatory, Immediate, + Txn, Message) -> + R = rabbit_misc:upmap( + fun ({Node, QPids}) -> + try gen_server:call( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}) + catch + _Class:_Reason -> + %% TODO: figure out what to log (and do!) here + {false, []} + end + end, + NodeQPids), + {Routed, Handled} = + lists:foldl(fun ({Routed, Handled}, {RoutedAcc, HandledAcc}) -> + {Routed or RoutedAcc, + %% we do the concatenation below, which + %% should be faster + [Handled | HandledAcc]} + end, + {false, []}, + R), + check_delivery(Mandatory, Immediate, {Routed, lists:append(Handled)}). + +%%-------------------------------------------------------------------- + +init([]) -> + {ok, no_state}. + +handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, + From, State) -> + spawn( + fun () -> + R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), + gen_server:reply(From, R) + end), + {noreply, State}. + +handle_cast({deliver, QPids, Mandatory, Immediate, Txn, Message}, + State) -> + %% in order to preserve message ordering we must not spawn here + run_bindings(QPids, Mandatory, Immediate, Txn, Message), + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- + +run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> + lists:foldl( + fun (QPid, {Routed, Handled}) -> + case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, + Txn, Message, QPid) of + true -> {true, [QPid | Handled]}; + false -> {true, Handled}; + {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n", + [QPid, Reason]), + {Routed, Handled} + end + end, + {false, []}, + QPids). + +%% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) +check_delivery(true, _ , {false, []}) -> {error, unroutable}; +check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; +check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl new file mode 100644 index 0000000000..ea55869004 --- /dev/null +++ b/src/rabbit_sup.erl @@ -0,0 +1,40 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl new file mode 100644 index 0000000000..beeb35080a --- /dev/null +++ b/src/rabbit_tests.erl @@ -0,0 +1,393 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_tests). + +-export([all_tests/0, test_parsing/0]). + +-import(lists). + +test_content_prop_roundtrip(Datum, Binary) -> + Types = [element(1, E) || E <- Datum], + Values = [element(2, E) || E <- Datum], + Values = rabbit_binary_parser:parse_properties(Types, Binary), %% assertion + Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion + +all_tests() -> + passed = test_parsing(), + passed = test_topic_matching(), + passed = test_app_management(), + passed = test_cluster_management(), + passed = test_user_management(), + passed. + +test_parsing() -> + passed = test_content_properties(), + passed. + +test_content_properties() -> + test_content_prop_roundtrip([], <<0, 0>>), + test_content_prop_roundtrip([{bit, true}, {bit, false}, {bit, true}, {bit, false}], + <<16#A0, 0>>), + test_content_prop_roundtrip([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined}, + {bit, true}], + <<16#E8,0,123>>), + test_content_prop_roundtrip([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}], + <<16#F0,0,123,123>>), + test_content_prop_roundtrip([{bit, true}, {shortstr, <<"hi">>}, {bit, true}, + {shortint, 54321}, {bit, true}], + <<16#F8,0,2,"hi",16#D4,16#31>>), + test_content_prop_roundtrip([{bit, true}, {shortstr, undefined}, {bit, true}, + {shortint, 54321}, {bit, true}], + <<16#B8,0,16#D4,16#31>>), + test_content_prop_roundtrip([{table, [{<<"a signedint">>, signedint, 12345678}, + {<<"a longstr">>, longstr, <<"yes please">>}, + {<<"a decimal">>, decimal, {123, 12345678}}, + {<<"a timestamp">>, timestamp, 123456789012345}, + {<<"a nested table">>, table, + [{<<"one">>, signedint, 1}, + {<<"two">>, signedint, 2}]}]}], + << + 16#8000:16, % flags + % properties: + + 117:32, % table length in bytes + + 11,"a signedint", % name + "I",12345678:32, % type and value + + 9,"a longstr", + "S",10:32,"yes please", + + 9,"a decimal", + "D",123,12345678:32, + + 11,"a timestamp", + "T", 123456789012345:64, + + 14,"a nested table", + "F", + 18:32, + + 3,"one", + "I",1:32, + + 3,"two", + "I",2:32 >>), + case catch rabbit_binary_parser:parse_properties([bit, bit, bit, bit], <<16#A0,0,1>>) of + {'EXIT', content_properties_binary_overflow} -> passed; + V -> exit({got_success_but_expected_failure, V}) + end. + +test_topic_match(P, R) -> + test_topic_match(P, R, true). + +test_topic_match(P, R, Expected) -> + case rabbit_exchange:topic_matches(list_to_binary(P), list_to_binary(R)) of + Expected -> + passed; + _ -> + {topic_match_failure, P, R} + end. + +test_topic_matching() -> + passed = test_topic_match("#", "test.test"), + passed = test_topic_match("#", ""), + passed = test_topic_match("#.T.R", "T.T.R"), + passed = test_topic_match("#.T.R", "T.R.T.R"), + passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), + passed = test_topic_match("#.test", "test"), + passed = test_topic_match("#.test", "test.test"), + passed = test_topic_match("#.test", "ignored.test"), + passed = test_topic_match("#.test", "more.ignored.test"), + passed = test_topic_match("#.test", "notmatched", false), + passed = test_topic_match("#.z", "one.two.three.four", false), + passed. + +test_app_management() -> + %% starting, stopping, status + ok = control_action(stop_app, []), + ok = control_action(stop_app, []), + ok = control_action(status, []), + ok = control_action(start_app, []), + ok = control_action(start_app, []), + ok = control_action(status, []), + passed. + +test_cluster_management() -> + + %% 'cluster' and 'reset' should only work if the app is stopped + {error, _} = control_action(cluster, []), + {error, _} = control_action(reset, []), + {error, _} = control_action(force_reset, []), + + ok = control_action(stop_app, []), + + %% various ways of creating a standalone node + NodeS = atom_to_list(node()), + ClusteringSequence = [[], + [NodeS], + ["invalid@invalid", NodeS], + [NodeS, "invalid@invalid"]], + + ok = control_action(reset, []), + lists:foreach(fun (Arg) -> + ok = control_action(cluster, Arg), + ok + end, + ClusteringSequence), + lists:foreach(fun (Arg) -> + ok = control_action(reset, []), + ok = control_action(cluster, Arg), + ok + end, + ClusteringSequence), + ok = control_action(reset, []), + lists:foreach(fun (Arg) -> + ok = control_action(cluster, Arg), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok + end, + ClusteringSequence), + lists:foreach(fun (Arg) -> + ok = control_action(reset, []), + ok = control_action(cluster, Arg), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok + end, + ClusteringSequence), + + %% attempt to convert a disk node into a ram node + ok = control_action(reset, []), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + {error, {cannot_convert_disk_node_to_ram_node, _}} = + control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), + + %% attempt to join a non-existing cluster as a ram node + ok = control_action(reset, []), + {error, {unable_to_contact_cluster_nodes, _}} = + control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), + + SecondaryNode = rabbit_misc:localnode(hare), + case net_adm:ping(SecondaryNode) of + pong -> passed = test_cluster_management2(SecondaryNode); + pang -> io:format("Skipping clustering tests with node ~p~n", + [SecondaryNode]) + end, + + ok = control_action(start_app, []), + + passed. + +test_cluster_management2(SecondaryNode) -> + NodeS = atom_to_list(node()), + SecondaryNodeS = atom_to_list(SecondaryNode), + + %% attempt to convert a disk node into a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [NodeS]), + {error, {unable_to_join_cluster, _, _}} = + control_action(cluster, [SecondaryNodeS]), + + %% join cluster as a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS, "invalid1@invalid"]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% change cluster config while remaining in same cluster + ok = control_action(cluster, ["invalid2@invalid", SecondaryNodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% attempt to join non-existing cluster as a ram node + {error, _} = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), + + %% turn ram node into disk node + ok = control_action(cluster, [SecondaryNodeS, NodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% attempt to convert a disk node into a ram node + {error, {cannot_convert_disk_node_to_ram_node, _}} = + control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), + + %% turn a disk node into a ram node + ok = control_action(cluster, [SecondaryNodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% NB: this will log an inconsistent_database error, which is harmless + true = disconnect_node(SecondaryNode), + pong = net_adm:ping(SecondaryNode), + + %% leaving a cluster as a ram node + ok = control_action(reset, []), + %% ...and as a disk node + ok = control_action(cluster, [SecondaryNodeS, NodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + ok = control_action(reset, []), + + %% attempt to leave cluster when no other node is alive + ok = control_action(cluster, [SecondaryNodeS, NodeS]), + ok = control_action(start_app, []), + ok = control_action(stop_app, SecondaryNode, []), + ok = control_action(stop_app, []), + {error, {no_running_cluster_nodes, _, _}} = + control_action(reset, []), + ok = control_action(force_reset, []), + + passed. + +test_user_management() -> + + %% lots if stuff that should fail + {error, {no_such_user, _}} = + control_action(delete_user, ["foo"]), + {error, {no_such_user, _}} = + control_action(change_password, ["foo", "baz"]), + {error, {no_such_vhost, _}} = + control_action(delete_vhost, ["/testhost"]), + {error, {no_such_user, _}} = + control_action(map_user_vhost, ["foo", "/"]), + {error, {no_such_user, _}} = + control_action(unmap_user_vhost, ["foo", "/"]), + {error, {no_such_user, _}} = + control_action(list_user_vhosts, ["foo"]), + {error, {no_such_user, _}} = + control_action(set_permissions, ["foo", "/", "/data"]), + {error, {no_such_user, _}} = + control_action(list_permissions, ["foo", "/"]), + {error, {no_such_vhost, _}} = + control_action(map_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(unmap_user_vhost, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(list_vhost_users, ["/testhost"]), + {error, {no_such_vhost, _}} = + control_action(set_permissions, ["guest", "/testhost", "/data"]), + {error, {no_such_vhost, _}} = + control_action(list_permissions, ["guest", "/testhost"]), + {error, {no_such_vhost, _}} = + control_action(add_realm, ["/testhost", "/data/test"]), + {error, {no_such_vhost, _}} = + control_action(delete_realm, ["/testhost", "/data/test"]), + {error, {no_such_vhost, _}} = + control_action(list_realms, ["/testhost"]), + {error, {no_such_realm, _}} = + control_action(set_permissions, ["guest", "/", "/data/test"]), + {error, {no_such_realm, _}} = + control_action(delete_realm, ["/", "/data/test"]), + + %% user creation + ok = control_action(add_user, ["foo", "bar"]), + {error, {user_already_exists, _}} = + control_action(add_user, ["foo", "bar"]), + ok = control_action(change_password, ["foo", "baz"]), + ok = control_action(list_users, []), + + %% vhost creation + ok = control_action(add_vhost, ["/testhost"]), + {error, {vhost_already_exists, _}} = + control_action(add_vhost, ["/testhost"]), + ok = control_action(list_vhosts, []), + + %% user/vhost mapping + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(list_user_vhosts, ["foo"]), + + %% realm creation + ok = control_action(add_realm, ["/testhost", "/data/test"]), + {error, {realm_already_exists, _}} = + control_action(add_realm, ["/testhost", "/data/test"]), + ok = control_action(list_realms, ["/testhost"]), + + %% user permissions + ok = control_action(set_permissions, + ["foo", "/testhost", "/data/test", + "passive", "active", "write", "read"]), + ok = control_action(list_permissions, ["foo", "/testhost"]), + ok = control_action(set_permissions, + ["foo", "/testhost", "/data/test", "all"]), + ok = control_action(set_permissions, + ["foo", "/testhost", "/data/test"]), + {error, not_mapped_to_vhost} = + control_action(set_permissions, + ["guest", "/testhost", "/data/test"]), + {error, not_mapped_to_vhost} = + control_action(list_permissions, ["guest", "/testhost"]), + + %% realm deletion + ok = control_action(delete_realm, ["/testhost", "/data/test"]), + {error, {no_such_realm, _}} = + control_action(delete_realm, ["/testhost", "/data/test"]), + + %% user/vhost unmapping + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), + + %% vhost deletion + ok = control_action(delete_vhost, ["/testhost"]), + {error, {no_such_vhost, _}} = + control_action(delete_vhost, ["/testhost"]), + + %% deleting a populated vhost + ok = control_action(add_vhost, ["/testhost"]), + ok = control_action(add_realm, ["/testhost", "/data/test"]), + ok = control_action(map_user_vhost, ["foo", "/testhost"]), + ok = control_action(set_permissions, + ["foo", "/testhost", "/data/test", "all"]), + _ = rabbit_amqqueue:declare( + rabbit_misc:r(<<"/testhost">>, realm, <<"/data/test">>), + <<"bar">>, true, false, []), + ok = control_action(delete_vhost, ["/testhost"]), + + %% user deletion + ok = control_action(delete_user, ["foo"]), + {error, {no_such_user, _}} = + control_action(delete_user, ["foo"]), + + passed. + +control_action(Command, Args) -> control_action(Command, node(), Args). + +control_action(Command, Node, Args) -> + case catch rabbit_control:action(Command, Node, Args) of + ok -> + io:format("done.~n"), + ok; + Other -> + io:format("failed.~n"), + Other + end. diff --git a/src/rabbit_ticket.erl b/src/rabbit_ticket.erl new file mode 100644 index 0000000000..3a608faa91 --- /dev/null +++ b/src/rabbit_ticket.erl @@ -0,0 +1,131 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_ticket). +-include("rabbit.hrl"). + +-export([record_ticket/2, lookup_ticket/4, check_ticket/4]). + +-import(application). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(ticket_number() :: non_neg_integer()). +%% we'd like to write #ticket.passive_flag | #ticket.active_flag | ... +%% but dialyzer doesn't support that. +-type(ticket_field() :: 3..6). + +-spec(record_ticket/2 :: (ticket_number(), ticket()) -> 'ok'). +-spec(lookup_ticket/4 :: + (ticket_number(), ticket_field(), username(), vhost()) -> + ticket()). +-spec(check_ticket/4 :: + (ticket_number(), ticket_field(), r('exchange' | 'queue'), username()) -> + 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +record_ticket(TicketNumber, Ticket) -> + put({ticket, TicketNumber}, Ticket), + ok. + +lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath) -> + case get({ticket, TicketNumber}) of + undefined -> + %% Spec: "The server MUST isolate access tickets per + %% channel and treat an attempt by a client to mix these + %% as a connection exception." + rabbit_log:warning("Attempt by client to use invalid ticket ~p~n", [TicketNumber]), + maybe_relax_checks(TicketNumber, Username, VHostPath); + Ticket = #ticket{} -> + case element(FieldIndex, Ticket) of + false -> rabbit_misc:protocol_error( + access_refused, + "ticket ~w has insufficient permissions", + [TicketNumber]); + true -> Ticket + end + end. + +maybe_relax_checks(TicketNumber, Username, VHostPath) -> + case rabbit_misc:strict_ticket_checking() of + true -> + rabbit_misc:protocol_error( + access_refused, "invalid ticket ~w", [TicketNumber]); + false -> + rabbit_log:warning("Lax ticket check mode: fabricating full ticket ~p for user ~p, vhost ~p~n", + [TicketNumber, Username, VHostPath]), + Ticket = rabbit_access_control:full_ticket( + rabbit_misc:r(VHostPath, realm, <<"/data">>)), + case rabbit_realm:access_request(Username, false, Ticket) of + ok -> record_ticket(TicketNumber, Ticket), + Ticket; + {error, Reason} -> + rabbit_misc:protocol_error( + Reason, + "fabrication of ticket ~w for user '~s' in vhost '~s' failed", + [TicketNumber, Username, VHostPath]) + end + end. + +check_ticket(TicketNumber, FieldIndex, + Name = #resource{virtual_host = VHostPath}, Username) -> + #ticket{realm_name = RealmName} = + lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath), + case resource_in_realm(RealmName, Name) of + false -> + case rabbit_misc:strict_ticket_checking() of + true -> + rabbit_misc:protocol_error( + access_refused, + "insufficient permissions in ticket ~w to access ~s in ~s", + [TicketNumber, rabbit_misc:rs(Name), + rabbit_misc:rs(RealmName)]); + false -> + rabbit_log:warning("Lax ticket check mode: ignoring cross-realm access for ticket ~p~n", [TicketNumber]), + ok + end; + true -> + ok + end. + +resource_in_realm(RealmName, ResourceName = #resource{kind = Kind}) -> + CacheKey = {resource_cache, RealmName, Kind}, + case get(CacheKey) of + Name when Name == ResourceName -> + true; + _ -> + case rabbit_realm:check(RealmName, ResourceName) of + true -> + put(CacheKey, ResourceName), + true; + _ -> + false + end + end. diff --git a/src/rabbit_tracer.erl b/src/rabbit_tracer.erl new file mode 100644 index 0000000000..5365b8732f --- /dev/null +++ b/src/rabbit_tracer.erl @@ -0,0 +1,44 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_tracer). +-export([start/0]). + +-import(erlang). + +start() -> + spawn(fun mainloop/0), + ok. + +mainloop() -> + erlang:trace(new, true, [all]), + mainloop1(). + +mainloop1() -> + receive + Msg -> + rabbit_log:info("TRACE: ~p~n", [Msg]) + end, + mainloop1(). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl new file mode 100644 index 0000000000..eda871ecc7 --- /dev/null +++ b/src/rabbit_writer.erl @@ -0,0 +1,162 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_writer). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([start/3, shutdown/1, mainloop/1]). +-export([send_command/2, send_command/3, + send_command_and_notify/5]). +-export([internal_send_command/3, internal_send_command/5]). + +-import(gen_tcp). + +-record(wstate, {sock, channel, frame_max}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). +-spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command_and_notify/5 :: + (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). +-spec(internal_send_command/3 :: + (socket(), channel_number(), amqp_method()) -> 'ok'). +-spec(internal_send_command/5 :: + (socket(), channel_number(), amqp_method(), + content(), non_neg_integer()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start(Sock, Channel, FrameMax) -> + spawn(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}]). + +mainloop(State) -> + receive + Message -> ?MODULE:mainloop(handle_message(Message, State)) + end. + +handle_message({send_command, MethodRecord}, + State = #wstate{sock = Sock, channel = Channel}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord), + State; +handle_message({send_command, MethodRecord, Content}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + State; +handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + rabbit_amqqueue:notify_sent(QPid, ChPid), + State; +handle_message({inet_reply, _, ok}, State) -> + State; +handle_message({inet_reply, _, Status}, _State) -> + exit({writer, send_failed, Status}); +handle_message(shutdown, _State) -> + exit(normal); +handle_message(Message, _State) -> + exit({writer, message_not_understood, Message}). + +%--------------------------------------------------------------------------- + +send_command(W, MethodRecord) -> + W ! {send_command, MethodRecord}, + ok. + +send_command(W, MethodRecord, Content) -> + W ! {send_command, MethodRecord, Content}, + ok. + +send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> + W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, + ok. + +shutdown(W) -> + W ! shutdown, + ok. + +%--------------------------------------------------------------------------- + +assemble_frames(Channel, MethodRecord) -> + ?LOGMESSAGE(out, Channel, MethodRecord, none), + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + +assemble_frames(Channel, MethodRecord, Content, FrameMax) -> + ?LOGMESSAGE(out, Channel, MethodRecord, Content), + MethodName = rabbit_misc:method_record_type(MethodRecord), + true = rabbit_framing:method_has_content(MethodName), % assertion + MethodFrame = rabbit_binary_generator:build_simple_method_frame( + Channel, MethodRecord), + ContentFrames = rabbit_binary_generator:build_simple_content_frames( + Channel, Content, FrameMax), + [MethodFrame | ContentFrames]. + +internal_send_command(Sock, Channel, MethodRecord) -> + ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord)). + +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> + ok = gen_tcp:send(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)). + +%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, +%% Status} to obtain the result. That is bad when it is called from +%% the writer since it requires scanning of the writers possibly quite +%% large message queue. +%% +%% So instead we lift the code from prim_inet:send/2, which is what +%% gen_tcp:send/2 calls, do the first half here and then just process +%% the result code in handle_message/2 as and when it arrives. +%% +%% This means we may end up happily sending data down a closed/broken +%% socket, but that's ok since a) data in the buffers will be lost in +%% any case (so qualitatively we are no worse off than if we used +%% gen_tcp:send/2), and b) we do detect the changed socket status +%% eventually, i.e. when we get round to handling the result code. +%% +%% Also note that the port has bounded buffers and port_command blocks +%% when these are full. So the fact that we process the result +%% asynchronously does not impact flow control. +internal_send_command_async(Sock, Channel, MethodRecord) -> + true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)), + ok. + +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> + true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)), + ok. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl new file mode 100644 index 0000000000..3e4c5ee423 --- /dev/null +++ b/src/tcp_acceptor.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(tcp_acceptor). + +-behaviour(gen_server). + +-export([start_link/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {callback, sock, ref}). + +%%-------------------------------------------------------------------- + +start_link(Callback, LSock) -> + gen_server:start_link(?MODULE, {Callback, LSock}, []). + +%%-------------------------------------------------------------------- + +init({Callback, LSock}) -> + case prim_inet:async_accept(LSock, -1) of + {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}}; + Error -> {stop, {cannot_accept, Error}} + end. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({inet_async, LSock, Ref, {ok, Sock}}, + State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) -> + + %% patch up the socket so it looks like one we got from + %% gen_tcp:accept/1 + {ok, Mod} = inet_db:lookup_socket(LSock), + inet_db:register_socket(Sock, Mod), + + %% report + {ok, {Address, Port}} = inet:sockname(LSock), + {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), + error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", + [inet_parse:ntoa(Address), Port, + inet_parse:ntoa(PeerAddress), PeerPort]), + + %% handle + apply(M, F, A ++ [Sock]), + + %% accept more + case prim_inet:async_accept(LSock, -1) of + {ok, NRef} -> {noreply, State#state{ref=NRef}}; + Error -> {stop, {cannot_accept, Error}, none} + end; +handle_info({inet_async, LSock, Ref, {error, closed}}, + State=#state{sock=LSock, ref=Ref}) -> + %% It would be wrong to attempt to restart the acceptor when we + %% know this will fail. + {stop, normal, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl new file mode 100644 index 0000000000..a3144e410c --- /dev/null +++ b/src/tcp_acceptor_sup.erl @@ -0,0 +1,40 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(tcp_acceptor_sup). + +-behaviour(supervisor). + +-export([start_link/2]). + +-export([init/1]). + +start_link(Name, Callback) -> + supervisor:start_link({local,Name}, ?MODULE, Callback). + +init(Callback) -> + {ok, {{simple_one_for_one, 10, 10}, + [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]}, + transient, brutal_kill, worker, [tcp_acceptor]}]}}. diff --git a/src/tcp_client_sup.erl b/src/tcp_client_sup.erl new file mode 100644 index 0000000000..4e1b9c6b30 --- /dev/null +++ b/src/tcp_client_sup.erl @@ -0,0 +1,43 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(tcp_client_sup). + +-behaviour(supervisor). + +-export([start_link/1, start_link/2]). + +-export([init/1]). + +start_link(Callback) -> + supervisor:start_link(?MODULE, Callback). + +start_link(SupName, Callback) -> + supervisor:start_link(SupName, ?MODULE, Callback). + +init({M,F,A}) -> + {ok, {{simple_one_for_one, 10, 10}, + [{tcp_client, {M,F,A}, + temporary, brutal_kill, worker, [M]}]}}. diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl new file mode 100644 index 0000000000..3943161a9f --- /dev/null +++ b/src/tcp_listener.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(tcp_listener). + +-behaviour(gen_server). + +-export([start_link/7]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {sock, on_startup, on_shutdown}). + +%%-------------------------------------------------------------------- + +start_link(IPAddress, Port, SocketOpts, + ConcurrentAcceptorCount, AcceptorSup, + OnStartup, OnShutdown) -> + gen_server:start_link( + ?MODULE, {IPAddress, Port, SocketOpts, + ConcurrentAcceptorCount, AcceptorSup, + OnStartup, OnShutdown}, []). + +%%-------------------------------------------------------------------- + +init({IPAddress, Port, SocketOpts, + ConcurrentAcceptorCount, AcceptorSup, + {M,F,A} = OnStartup, OnShutdown}) -> + process_flag(trap_exit, true), + case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, + {active, false}]) of + {ok, LSock} -> + lists:foreach(fun (_) -> + {ok, _APid} = supervisor:start_child( + AcceptorSup, [LSock]) + end, + lists:duplicate(ConcurrentAcceptorCount, dummy)), + error_logger:info_msg( + "started TCP listener on ~s:~p~n", + [inet_parse:ntoa(IPAddress), Port]), + apply(M, F, A ++ [IPAddress, Port]), + {ok, #state{sock=LSock, + on_startup = OnStartup, on_shutdown = OnShutdown}}; + {error, Reason} -> + error_logger:error_msg( + "failed to start TCP listener on ~s:~p - ~p~n", + [inet_parse:ntoa(IPAddress), Port, Reason]), + {stop, {cannot_listen, IPAddress, Port, Reason}} + end. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) -> + {ok, {IPAddress, Port}} = inet:sockname(LSock), + gen_tcp:close(LSock), + error_logger:info_msg("stopped TCP listener on ~s:~p~n", + [inet_parse:ntoa(IPAddress), Port]), + apply(M, F, A ++ [IPAddress, Port]). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl new file mode 100644 index 0000000000..9347240668 --- /dev/null +++ b/src/tcp_listener_sup.erl @@ -0,0 +1,60 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(tcp_listener_sup). + +-behaviour(supervisor). + +-export([start_link/6, start_link/7]). + +-export([init/1]). + +start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, + AcceptCallback) -> + start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, + AcceptCallback, 1). + +start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, + AcceptCallback, ConcurrentAcceptorCount) -> + supervisor:start_link( + ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, + AcceptCallback, ConcurrentAcceptorCount}). + +init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, + AcceptCallback, ConcurrentAcceptorCount}) -> + %% This is gross. The tcp_listener needs to know about the + %% tcp_acceptor_sup, and the only way I can think of accomplishing + %% that without jumping through hoops is to register the + %% tcp_acceptor_sup. + Name = rabbit_misc:tcp_name(tcp_acceptor_sup, IPAddress, Port), + {ok, {{one_for_all, 10, 10}, + [{tcp_acceptor_sup, {tcp_acceptor_sup, start_link, + [Name, AcceptCallback]}, + transient, infinity, supervisor, [tcp_acceptor_sup]}, + {tcp_listener, {tcp_listener, start_link, + [IPAddress, Port, SocketOpts, + ConcurrentAcceptorCount, Name, + OnStartup, OnShutdown]}, + transient, 100, worker, [tcp_listener]}]}}. |
