diff options
| author | Essien Ita Essien <essiene@gmail.com> | 2009-02-19 08:46:15 +0100 |
|---|---|---|
| committer | Essien Ita Essien <essiene@gmail.com> | 2009-02-19 08:46:15 +0100 |
| commit | 9b35a0aed208f16f9cfa1fb63c6e5c2b75f02a08 (patch) | |
| tree | 688f577439c883a99eab30bb33ed543fd8e8ce0a /src | |
| parent | a7c825b0dd8ef6828cca881dd6cfc6bfc2c6ecda (diff) | |
| parent | 3f5c0fce99c119abbe27ca1dd05e4bf57f27db3e (diff) | |
| download | rabbitmq-server-git-9b35a0aed208f16f9cfa1fb63c6e5c2b75f02a08.tar.gz | |
Merge with upstream default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_guid.erl | 126 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_persister.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 |
11 files changed, 222 insertions, 109 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a1c9441ace..0d1ce689ea 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -155,6 +155,10 @@ start(normal, []) -> fun () -> ok = start_child(rabbit_persister) end}, + {"guid generator", + fun () -> + ok = start_child(rabbit_guid) + end}, {"builtin applications", fun () -> {ok, DefaultVHost} = application:get_env(default_vhost), @@ -280,13 +284,14 @@ insert_default_data() -> {ok, DefaultUser} = application:get_env(default_user), {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), - {ok, [DefaultConfigurationPerm, DefaultMessagingPerm]} = + {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = application:get_env(default_permissions), ok = rabbit_access_control:add_vhost(DefaultVHost), ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost, - DefaultConfigurationPerm, - DefaultMessagingPerm), + DefaultConfigurePerm, + DefaultWritePerm, + DefaultReadPerm), ok. start_builtin_amq_applications() -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 394eb2b124..da0ab9cf7a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -38,7 +38,7 @@ -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, list_vhosts/0]). --export([set_permissions/4, clear_permissions/2, +-export([set_permissions/5, clear_permissions/2, list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -58,12 +58,13 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(set_permissions/4 :: (username(), vhost(), regexp(), regexp()) -> 'ok'). +-spec(set_permissions/5 :: + (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). -spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). -spec(list_vhost_permissions/1 :: - (vhost()) -> [{username(), regexp(), regexp()}]). + (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). -spec(list_user_permissions/1 :: - (username()) -> [{vhost(), regexp(), regexp()}]). + (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). -endif. @@ -272,7 +273,7 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun ({Username, _, _}) -> + lists:foreach(fun ({Username, _, _, _}) -> ok = clear_permissions(Username, VHostPath) end, list_vhost_permissions(VHostPath)), @@ -289,9 +290,8 @@ validate_regexp(RegexpBin) -> {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) end. -set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> - validate_regexp(ConfigurationPerm), - validate_regexp(MessagingPerm), +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, @@ -301,8 +301,9 @@ set_permissions(Username, VHostPath, ConfigurationPerm, MessagingPerm) -> username = Username, virtual_host = VHostPath}, permission = #permission{ - configuration = ConfigurationPerm, - messaging = MessagingPerm}}, + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, write) end)). @@ -317,24 +318,25 @@ clear_permissions(Username, VHostPath) -> end)). list_vhost_permissions(VHostPath) -> - [{Username, ConfigurationPerm, MessagingPerm} || - {Username, _, ConfigurationPerm, MessagingPerm} <- + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_vhost( VHostPath, match_user_vhost('_', VHostPath)))]. list_user_permissions(Username) -> - [{VHostPath, ConfigurationPerm, MessagingPerm} || - {_, VHostPath, ConfigurationPerm, MessagingPerm} <- + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- list_permissions(rabbit_misc:with_user( Username, match_user_vhost(Username, '_')))]. list_permissions(QueryThunk) -> - [{Username, VHostPath, ConfigurationPerm, MessagingPerm} || + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || #user_permission{user_vhost = #user_vhost{username = Username, virtual_host = VHostPath}, permission = #permission{ - configuration = ConfigurationPerm, - messaging = MessagingPerm}} <- + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction(QueryThunk)]. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5a1c095266..7574cd673a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,12 +35,12 @@ -behaviour(gen_server2). --export([start_link/4, do/2, do/3, shutdown/1]). +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, @@ -54,7 +54,8 @@ -ifdef(use_specs). --spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -66,9 +67,10 @@ %%---------------------------------------------------------------------------- -start_link(ReaderPid, WriterPid, Username, VHost) -> +start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> {ok, Pid} = gen_server2:start_link( - ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + ?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost], []), Pid. do(Pid, Method) -> @@ -91,11 +93,12 @@ conserve_memory(Pid, Conserve) -> %%--------------------------------------------------------------------------- -init([ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {ok, #ch{state = starting, + channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, limiter_pid = undefined, @@ -123,8 +126,11 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - {stop, {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, State}; + ok = notify_queues(internal_rollback(State)), + Reason = {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; _:Reason -> @@ -224,11 +230,14 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configuration_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.configuration). +check_configure_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.configure). -check_messaging_permitted(Resource, #ch{ username = Username}) -> - check_resource_access(Username, Resource, #permission.messaging). +check_write_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.write). + +check_read_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.read). expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( @@ -299,13 +308,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_messaging_permitted(ExchangeName, State), + check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), PersistentKey = case is_message_persistent(DecodedContent) of - true -> rabbit_misc:guid(); + true -> rabbit_guid:guid(); false -> none end, {noreply, publish(Mandatory, Immediate, @@ -343,7 +352,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of @@ -378,10 +387,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of - <<>> -> rabbit_misc:binstring_guid("amq.ctag"); + <<>> -> rabbit_guid:binstring_guid("amq.ctag"); Other -> Other end, @@ -537,7 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -557,7 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -567,7 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -614,15 +623,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> ActualNameBin = case QueueNameBin of - <<>> -> rabbit_misc:binstring_guid("amq.gen"); + <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); Other = #amqqueue{name = QueueName} -> - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Other end, return_queue_declare_ok(State, NoWait, Q); @@ -632,7 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -643,7 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_configuration_permitted(QueueName, State), + check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -680,7 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_messaging_permitted(QueueName, State), + check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -730,11 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_configuration_permitted(QueueName, State), + check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_configuration_permitted(ExchangeName, State), + check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, queue_not_found} -> rabbit_misc:protocol_error( @@ -823,7 +832,7 @@ ack(TxnKey, UAQ) -> [QPid | L] end, [], UAQ). -make_tx_id() -> rabbit_misc:guid(). +make_tx_id() -> rabbit_guid:guid(). new_tx(State) -> State#ch{transaction_id = make_tx_id(), diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 293cd79751..e6717d689f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -114,7 +114,7 @@ Available commands: delete_vhost <VHostPath> list_vhosts - set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> + set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp> clear_permissions [-p <VHostPath>] <UserName> list_permissions [-p <VHostPath>] list_user_permissions <UserName> @@ -267,10 +267,10 @@ action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). -action(set_permissions, Node, VHost, [Username, CPerm, MPerm], Inform) -> +action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) -> Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), call(Node, {rabbit_access_control, set_permissions, - [Username, VHost, CPerm, MPerm]}); + [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, VHost, [Username], Inform) -> Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl new file mode 100644 index 0000000000..51c1665bbb --- /dev/null +++ b/src/rabbit_guid.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_guid). + +-include("rabbit.hrl"). + +-behaviour(gen_server). + +-export([start_link/0]). +-export([guid/0, string_guid/1, binstring_guid/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, {serial}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(guid/0 :: () -> guid()). +-spec(string_guid/1 :: (any()) -> string()). +-spec(binstring_guid/1 :: (any()) -> binary()). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + %% The persister can get heavily loaded, and we don't want that to + %% impact guid generation. We therefore keep the serial in a + %% separate process rather than calling rabbit_persister:serial/0 + %% directly in the functions below. + gen_server:start_link({local, ?SERVER}, ?MODULE, + [rabbit_persister:serial()], []). + +%% generate a guid that is monotonically increasing per process. +%% +%% The id is only unique within a single cluster and as long as the +%% persistent message store hasn't been deleted. +guid() -> + %% We don't use erlang:now() here because a) it may return + %% duplicates when the system clock has been rewound prior to a + %% restart, or ids were generated at a high rate (which causes + %% now() to move ahead of the system time), and b) it is really + %% slow since it takes a global lock and makes a system call. + %% + %% rabbit_persister:serial/0, in combination with self/0 (which + %% includes the node name) uniquely identifies a process in space + %% and time. We combine that with a process-local counter to give + %% us a GUID that is monotonically increasing per process. + G = case get(guid) of + undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + {S, I} -> {S, I+1} + end, + put(guid, G), + G. + +%% generate a readable string representation of a guid. Note that any +%% monotonicity of the guid is not preserved in the encoding. +string_guid(Prefix) -> + %% we use the (undocumented) ssl_base64 module here because it is + %% present throughout OTP R11 and R12 whereas base64 only becomes + %% available in R11B-4. + %% + %% TODO: once debian stable and EPEL have moved from R11B-2 to + %% R11B-4 or later we should change this to use base64. + Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). + +binstring_guid(Prefix) -> + list_to_binary(string_guid(Prefix)). + +%%---------------------------------------------------------------------------- + +init([Serial]) -> + {ok, #state{serial = Serial}}. + +handle_call(serial, _From, State = #state{serial = Serial}) -> + {reply, Serial, State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 214c952834..5d176f8fac 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,7 +46,6 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). --export([guid/0, string_guid/1, binstring_guid/1]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -99,9 +98,6 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). --spec(guid/0 :: () -> guid()). --spec(string_guid/1 :: (any()) -> string()). --spec(binstring_guid/1 :: (any()) -> binary()). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). @@ -302,42 +298,6 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). -%% generate a guid that is monotonically increasing per process. -%% -%% The id is only unique within a single cluster and as the persistent -%% message store hasn't been deleted. -guid() -> - %% We don't use erlang:now() here because a) it may return - %% duplicates when the system clock has been rewound prior to a - %% restart, or ids were generated at a high rate (which causes - %% now() to move ahead of the system time), and b) it is really - %% slow since it takes a global lock and makes a system call. - %% - %% rabbit_persister:serial/0, in combination with self/0 (which - %% includes the node name) uniquely identifies a process in space - %% and time. We combine that with a process-local counter to give - %% us a GUID that is monotonically increasing per process. - G = case get(guid) of - undefined -> {{rabbit_persister:serial(), self()}, 0}; - {S, I} -> {S, I+1} - end, - put(guid, G), - G. - -%% generate a readable string representation of a guid. Note that any -%% monotonicity of the guid is not preserved in the encoding. -string_guid(Prefix) -> - %% we use the (undocumented) ssl_base64 module here because it is - %% present throughout OTP R11 and R12 whereas base64 only becomes - %% available in R11B-4. - %% - %% TODO: once debian stable and EPEL have moved from R11B-2 to - %% R11B-4 or later we should change this to use base64. - Prefix ++ "-" ++ ssl_base64:encode(erlang:md5(term_to_binary(guid()))). - -binstring_guid(Prefix) -> - list_to_binary(string_guid(Prefix)). - dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c0bf3d254e..15213861bd 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -143,8 +143,7 @@ table_definitions() -> {disc_copies, [node()]}]}, {rabbit_queue, [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {index, [pid]}]}]. + {attributes, record_info(fields, amqqueue)}]}]. table_names() -> [Tab || {Tab, _} <- table_definitions()]. diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index c34ad85100..94033a4f3d 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -92,7 +92,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_misc:guid(), + TxnKey = rabbit_guid:guid(), gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). extend_transaction(TxnKey, MessageList) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a6094ebc58..dbb9358314 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -231,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n", - [self(), PeerAddressS, PeerPort, Ex]) + Ex -> (if Ex == connection_closed_abruptly -> + fun rabbit_log:warning/2; + true -> + fun rabbit_log:error/2 + end)("exception on TCP connection ~p from ~s:~p~n~p~n", + [self(), PeerAddressS, PeerPort, Ex]) after rabbit_log:info("closing TCP connection ~p from ~s:~p~n", [self(), PeerAddressS, PeerPort]), @@ -284,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> exit(Reason); {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> throw(E); + {channel_exit, Channel, Reason} -> + mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -351,6 +357,14 @@ terminate_channel(Channel, Ref, State) -> end, State. +handle_channel_exit(Channel, Reason, State) -> + %% We remove the channel from the inbound map only. That allows + %% the channel to be re-opened, but also means the remaining + %% cleanup, including possibly closing the connection, is deferred + %% until we get the (normal) exit signal. + erase({channel, Channel}), + handle_exception(State, Channel, Reason). + handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), maybe_close(State); @@ -711,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> vhost = VHost}} = State, WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/4, - [self(), WriterPid, Username, VHost]), + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 26d857bef0..ff42ea0460 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -32,7 +32,7 @@ -module(rabbit_router). -include("rabbit.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -export([start_link/0, deliver/5]). @@ -58,7 +58,7 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -ifdef(BUG19758). @@ -143,7 +143,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, spawn( fun () -> R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), - gen_server:reply(From, R) + gen_server2:reply(From, R) end), {noreply, State}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ef390e4de6..6312e8e364 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -444,7 +444,7 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(delete_vhost, ["/testhost"]), {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", ".*", ".*"]), + control_action(set_permissions, ["foo", ".*", ".*", ".*"]), {error, {no_such_user, _}} = control_action(clear_permissions, ["foo"]), {error, {no_such_user, _}} = @@ -452,9 +452,7 @@ test_user_management() -> {error, {no_such_vhost, _}} = control_action(list_permissions, ["-p", "/testhost"]), {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", "+foo", ".*"]), - {error, {invalid_regexp, _, _}} = - control_action(set_permissions, ["guest", ".*", "+foo"]), + control_action(set_permissions, ["guest", "+foo", ".*", ".*"]), %% user creation ok = control_action(add_user, ["foo", "bar"]), @@ -471,9 +469,9 @@ test_user_management() -> %% user/vhost mapping ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(list_permissions, ["-p", "/testhost"]), ok = control_action(list_user_permissions, ["foo"]), @@ -489,7 +487,7 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), ok = control_action(set_permissions, ["-p", "/testhost", - "foo", ".*", ".*"]), + "foo", ".*", ".*", ".*"]), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion |
