summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-09 12:41:39 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-09 12:41:39 +0100
commit4133883b371e2aa03a2980a1157511bec5e47ce9 (patch)
tree82a5fa8f5015d34fdbceccbb70a31f364ded353d /src
parent367e8b451e056c8425058886a3bf945a74b3e518 (diff)
parent279c8ed54b8ef1eee5cba54fb90692722e923768 (diff)
downloadrabbitmq-server-git-4133883b371e2aa03a2980a1157511bec5e47ce9.tar.gz
Merge default into bug22889
Diffstat (limited to 'src')
-rw-r--r--src/delegate.erl5
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl14
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/pg_local.erl8
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_access_control.erl36
-rw-r--r--src/rabbit_amqqueue.erl113
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_basic.erl51
-rw-r--r--src/rabbit_binary_generator.erl21
-rw-r--r--src/rabbit_binary_parser.erl12
-rw-r--r--src/rabbit_channel.erl39
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_dialyzer.erl10
-rw-r--r--src/rabbit_error_logger.erl3
-rw-r--r--src/rabbit_error_logger_file_h.erl3
-rw-r--r--src/rabbit_exchange.erl151
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl7
-rw-r--r--src/rabbit_exchange_type_registry.erl13
-rw-r--r--src/rabbit_exchange_type_topic.erl6
-rw-r--r--src/rabbit_guid.erl12
-rw-r--r--src/rabbit_invariable_queue.erl9
-rw-r--r--src/rabbit_load.erl5
-rw-r--r--src/rabbit_log.erl2
-rw-r--r--src/rabbit_memory_monitor.erl6
-rw-r--r--src/rabbit_misc.erl116
-rw-r--r--src/rabbit_mnesia.erl12
-rw-r--r--src/rabbit_net.erl37
-rw-r--r--src/rabbit_networking.erl36
-rw-r--r--src/rabbit_persister.erl24
-rw-r--r--src/rabbit_queue_collector.erl (renamed from src/rabbit_reader_queue_collector.erl)32
-rw-r--r--src/rabbit_reader.erl14
-rw-r--r--src/rabbit_router.erl8
-rw-r--r--src/rabbit_sasl_report_file_h.erl3
-rw-r--r--src/rabbit_types.erl147
-rw-r--r--src/rabbit_writer.erl36
-rw-r--r--src/supervisor2.erl8
-rw-r--r--src/vm_memory_monitor.erl6
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl6
-rw-r--r--src/worker_pool_worker.erl3
43 files changed, 700 insertions, 344 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 8af2812781..3f57953bf7 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -44,8 +44,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())).
+-spec(invoke_no_result/2 ::
+ (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 1c1d62a95d..39ef3f85b8 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -43,7 +43,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), any()) | 'ignore').
-endif.
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 0f648dcd2b..e209ee6be4 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -182,18 +182,18 @@
-ifdef(use_specs).
-type(ref() :: any()).
--type(error() :: {'error', any()}).
--type(ok_or_error() :: ('ok' | error())).
--type(val_or_error(T) :: ({'ok', T} | error())).
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+-type(val_or_error(T) :: rabbit_types:ok_or_error2(T, any())).
-type(position() :: ('bof' | 'eof' | non_neg_integer() |
- {('bof' |'eof'), non_neg_integer()} | {'cur', integer()})).
+ {('bof' |'eof'), non_neg_integer()} |
+ {'cur', integer()})).
-type(offset() :: non_neg_integer()).
-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
- (string(), [any()],
- [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}]) ->
- val_or_error(ref())).
+ (string(), [any()],
+ [{'write_buffer', (non_neg_integer() | 'infinity' | 'unbuffered')}])
+ -> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
val_or_error([char()] | binary()) | 'eof').
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 547f0a42e2..49ae63c1d5 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -186,7 +186,7 @@
-ifdef(use_specs).
-spec(handle_common_termination/6 ::
- (any(), any(), any(), atom(), any(), any()) -> no_return()).
+ (any(), any(), any(), atom(), any(), any()) -> no_return()).
-spec(hibernate/7 ::
(pid(), any(), any(), atom(), any(), queue(), any()) -> no_return()).
diff --git a/src/pg_local.erl b/src/pg_local.erl
index 1501331d6b..f5ded123d7 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -36,8 +36,8 @@
-export([join/2, leave/2, get_members/1]).
-export([sync/0]). %% intended for testing only; not part of official API
--export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
- terminate/2]).
+-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2]).
%%----------------------------------------------------------------------------
@@ -45,8 +45,8 @@
-type(name() :: term()).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', term()}).
--spec(start/0 :: () -> {'ok', pid()} | {'error', term()}).
+-spec(start_link/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
+-spec(start/0 :: () -> rabbit_types:ok_or_error2(pid(), term())).
-spec(join/2 :: (name(), pid()) -> 'ok').
-spec(leave/2 :: (name(), pid()) -> 'ok').
-spec(get_members/1 :: (name()) -> [pid()]).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7df3998773..ada2c38e49 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,7 +33,8 @@
-behaviour(application).
--export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0,
+ rotate_logs/1]).
-export([start/2, stop/1]).
@@ -183,18 +184,19 @@
-ifdef(use_specs).
--type(log_location() :: 'tty' | 'undefined' | string()).
-type(file_suffix() :: binary()).
+%% this really should be an abstract type
+-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
--spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}).
--spec(status/0 :: () ->
- [{running_applications, [{atom(), string(), string()}]} |
- {nodes, [{node_type(), [erlang_node()]}]} |
- {running_nodes, [erlang_node()]}]).
+-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())).
+-spec(status/0 ::
+ () -> [{running_applications, [{atom(), string(), string()}]} |
+ {nodes, [{rabbit_mnesia:node_type(), [node()]}]} |
+ {running_nodes, [node()]}]).
-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-endif.
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index a445f44197..7d1839bb6e 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -45,28 +45,38 @@
-ifdef(use_specs).
+-export_type([username/0, password/0]).
+
-type(permission_atom() :: 'configure' | 'read' | 'write').
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(regexp() :: binary()).
--spec(check_login/2 :: (binary(), binary()) -> user()).
--spec(user_pass_login/2 :: (username(), password()) -> user()).
--spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
+-spec(check_login/2 :: (binary(), binary()) -> rabbit_types:user()).
+-spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user()).
+-spec(check_vhost_access/2 ::
+ (rabbit_types:user(), rabbit_types:vhost()) -> 'ok').
-spec(check_resource_access/3 ::
- (username(), r(atom()), permission_atom()) -> 'ok').
+ (username(), rabbit_types:r(atom()), permission_atom()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
-spec(list_users/0 :: () -> [username()]).
--spec(lookup_user/1 :: (username()) -> {'ok', user()} | not_found()).
--spec(add_vhost/1 :: (vhost()) -> 'ok').
--spec(delete_vhost/1 :: (vhost()) -> 'ok').
--spec(list_vhosts/0 :: () -> [vhost()]).
--spec(set_permissions/5 ::
- (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
--spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
+-spec(lookup_user/1 ::
+ (username()) -> rabbit_types:ok(rabbit_types:user())
+ | rabbit_types:error('not_found')).
+-spec(add_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(delete_vhost/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(list_vhosts/0 :: () -> [rabbit_types:vhost()]).
+-spec(set_permissions/5 ::(username(), rabbit_types:vhost(), regexp(),
+ regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (username(), rabbit_types:vhost()) -> 'ok').
-spec(list_vhost_permissions/1 ::
- (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
+ (rabbit_types:vhost())
+ -> [{username(), regexp(), regexp(), regexp()}]).
-spec(list_user_permissions/1 ::
- (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
+ (username())
+ -> [{rabbit_types:vhost(), regexp(), regexp(), regexp()}]).
-endif.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7b3d793b19..f1b527681c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -59,65 +59,94 @@
-ifdef(use_specs).
--type(qlen() :: {'ok', non_neg_integer()}).
--type(qfun(A) :: fun ((amqqueue()) -> A)).
+-export_type([name/0, qmsg/0]).
+
+-type(name() :: rabbit_types:r('queue')).
+
+-type(qlen() :: rabbit_types:ok(non_neg_integer())).
+-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A)).
+-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
+-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-spec(start/0 :: () -> 'ok').
--spec(declare/5 :: (queue_name(), boolean(), boolean(), amqp_table(),
- maybe(pid())) -> {'new' | 'existing', amqqueue()}).
--spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
--spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
--spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
--spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(),
- maybe(pid)) -> ok).
--spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
--spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A).
--spec(list/1 :: (vhost()) -> [amqqueue()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (amqqueue()) -> [info()]).
--spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
--spec(info_all/1 :: (vhost()) -> [[info()]]).
--spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(declare/5 ::
+ (name(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
+ -> {'new' | 'existing', rabbit_types:amqqueue()}).
+-spec(lookup/1 ::
+ (name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:error('not_found')).
+-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
+-spec(with_or_die/2 :: (name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 ::
+ (rabbit_types:amqqueue(), boolean(), boolean(),
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid))
+ -> ok).
+-spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:amqqueue()) -> [rabbit_types:info()]).
+-spec(info/2 ::
+ (rabbit_types:amqqueue(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 :: (rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(consumers/1 ::
+ (rabbit_types:amqqueue())
+ -> [{pid(), rabbit_types:ctag(), boolean()}]).
-spec(consumers_all/1 ::
- (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
+ (rabbit_types:vhost())
+ -> [{name(), pid(), rabbit_types:ctag(), boolean()}]).
-spec(stat/1 ::
- (amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}).
+ (rabbit_types:amqqueue())
+ -> {'ok', non_neg_integer(), non_neg_integer()}).
-spec(delete/3 ::
- (amqqueue(), 'false', 'false') -> qlen();
- (amqqueue(), 'true' , 'false') -> qlen() | {'error', 'in_use'};
- (amqqueue(), 'false', 'true' ) -> qlen() | {'error', 'not_empty'};
- (amqqueue(), 'true' , 'true' ) -> qlen() |
- {'error', 'in_use'} |
- {'error', 'not_empty'}).
--spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> boolean()).
+ (rabbit_types:amqqueue(), 'false', 'false')
+ -> qlen();
+ (rabbit_types:amqqueue(), 'true' , 'false')
+ -> qlen() | rabbit_types:error('in_use');
+ (rabbit_types:amqqueue(), 'false', 'true' )
+ -> qlen() | rabbit_types:error('not_empty');
+ (rabbit_types:amqqueue(), 'true' , 'true' )
+ -> qlen() |
+ rabbit_types:error('in_use') |
+ rabbit_types:error('not_empty')).
+-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
+-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit_all/3 :: ([pid()], txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], txn(), pid()) -> 'ok').
+-spec(ack/4 ::
+ (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
+ -> 'ok').
+-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
+-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
--spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
+-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (amqqueue(), boolean(), pid(), pid() | 'undefined', ctag(),
- boolean(), any()) ->
- 'ok' | {'error', 'exclusive_consume_unavailable'}).
--spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
+ (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
+ rabbit_types:ctag(), boolean(), any())
+ -> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
+-spec(basic_cancel/4 ::
+ (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue() | 'not_found').
--spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
--spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok').
+-spec(internal_declare/2 ::
+ (rabbit_types:amqqueue(), boolean())
+ -> rabbit_types:amqqueue() | 'not_found').
+-spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found')).
+-spec(maybe_run_queue_via_backing_queue/2 ::
+ (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(on_node_down/1 :: (erlang_node()) -> 'ok').
--spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
+-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
-endif.
@@ -223,7 +252,7 @@ assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
assert_equivalence(#amqqueue{name = QueueName},
_Durable, _AutoDelete, _Args, _Owner) ->
rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
+ not_allowed, "parameters for ~s not equivalent",
[rabbit_misc:rs(QueueName)]).
check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3bf48b4cf8..2fb60e9675 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 6e8481ad51..c76c01ac52 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -42,24 +42,41 @@
-ifdef(use_specs).
--type(properties_input() :: (amqp_properties() | [{atom(), any()}])).
--type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
-
--spec(publish/1 :: (delivery()) -> publish_result()).
--spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
- delivery()).
--spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> (message() | {'error', any()})).
--spec(properties/1 :: (properties_input()) -> amqp_properties()).
--spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
- binary()) -> publish_result()).
--spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(),
- maybe(txn()), properties_input(), binary()) ->
- publish_result()).
--spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
--spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
+-type(properties_input() ::
+ (rabbit_framing:amqp_property_record() | [{atom(), any()}])).
+-type(publish_result() ::
+ ({ok, rabbit_router:routing_result(), [pid()]}
+ | rabbit_types:error('not_found'))).
+
+-spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()).
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ rabbit_types:message())
+ -> rabbit_types:delivery()).
+-spec(message/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> (rabbit_types:message() | rabbit_types:error(any()))).
+-spec(properties/1 ::
+ (properties_input()) -> rabbit_framing:amqp_property_record()).
+-spec(publish/4 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(publish/7 ::
+ (rabbit_exchange:name(), rabbit_router:routing_key(),
+ boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ properties_input(), binary())
+ -> publish_result()).
+-spec(build_content/2 ::
+ (rabbit_framing:amqp_property_record(), binary())
+ -> rabbit_types:content()).
+-spec(from_content/1 ::
+ (rabbit_types:content())
+ -> {rabbit_framing:amqp_property_record(), binary()}).
-spec(is_message_persistent/1 ::
- (decoded_content()) -> (boolean() | {'invalid', non_neg_integer()})).
+ (rabbit_types:decoded_content())
+ -> (boolean() | {'invalid', non_neg_integer()})).
-endif.
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 200bf2cd44..e2c567106f 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -57,16 +57,23 @@
-type(frame() :: [binary()]).
-spec(build_simple_method_frame/3 ::
- (channel_number(), amqp_method_record(), protocol()) -> frame()).
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(),
+ rabbit_types:protocol())
+ -> frame()).
-spec(build_simple_content_frames/4 ::
- (channel_number(), content(), non_neg_integer(), protocol()) ->
- [frame()]).
+ (rabbit_channel:channel_number(), rabbit_types:content(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
--spec(generate_table/1 :: (amqp_table()) -> binary()).
--spec(encode_properties/2 :: ([amqp_property_type()], [any()]) -> binary()).
+-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
+-spec(encode_properties/2 ::
+ ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/2 :: (content(), protocol()) -> encoded_content()).
--spec(clear_encoded_content/1 :: (content()) -> unencoded_content()).
+-spec(ensure_content_encoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol()) ->
+ rabbit_types:encoded_content()).
+-spec(clear_encoded_content/1 ::
+ (rabbit_types:content()) -> rabbit_types:unencoded_content()).
-endif.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 633be6f0ca..1d0a62afe8 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -42,10 +42,14 @@
-ifdef(use_specs).
--spec(parse_table/1 :: (binary()) -> amqp_table()).
--spec(parse_properties/2 :: ([amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/2 :: (content(), protocol()) -> decoded_content()).
--spec(clear_decoded_content/1 :: (content()) -> undecoded_content()).
+-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
+-spec(parse_properties/2 ::
+ ([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
+-spec(ensure_content_decoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol())
+ -> rabbit_types:decoded_content()).
+-spec(clear_decoded_content/1 ::
+ (rabbit_types:content()) -> rabbit_types:undecoded_content()).
-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 213b6624d5..911c535246 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,8 +41,8 @@
-export([flow_timeout/2]).
--export([init/1, terminate/2, code_change/3,
- handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, handle_pre_hibernate/1]).
-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
@@ -71,24 +71,31 @@
-ifdef(use_specs).
+-export_type([channel_number/0]).
+
-type(ref() :: any()).
+-type(channel_number() :: non_neg_integer()).
-spec(start_link/6 ::
- (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
--spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
--spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok').
+ (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ rabbit_types:vhost(), pid()) -> pid()).
+-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
+-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(deliver/4 ::
+ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
+ -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
--spec(info_all/0 :: () -> [[info()]]).
--spec(info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
+-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
+-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-endif.
@@ -729,7 +736,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ _ -> rabbit_queue_collector:register(CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -905,7 +912,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
fun (_X, Q) ->
- rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ catch exit:Reason -> {error, Reason}
+ end
end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
@@ -920,6 +929,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
+ {error, #amqp_error{} = Error} ->
+ rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 323d4d2fd1..95a49f8679 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -44,7 +44,7 @@
-spec(start/0 :: () -> no_return()).
-spec(stop/0 :: () -> 'ok').
--spec(action/4 :: (atom(), erlang_node(), [string()],
+-spec(action/4 :: (atom(), node(), [string()],
fun ((string(), [any()]) -> 'ok')) -> 'ok').
-spec(usage/0 :: () -> no_return()).
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
index f19e8d025a..0ec6beb676 100644
--- a/src/rabbit_dialyzer.erl
+++ b/src/rabbit_dialyzer.erl
@@ -30,17 +30,17 @@
%%
-module(rabbit_dialyzer).
--include("rabbit.hrl").
--export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]).
+-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2,
+ halt_with_code/1]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(create_basic_plt/1 :: (file_path()) -> 'ok').
--spec(add_to_plt/2 :: (file_path(), string()) -> 'ok').
--spec(dialyze_files/2 :: (file_path(), string()) -> 'ok').
+-spec(create_basic_plt/1 :: (file:filename()) -> 'ok').
+-spec(add_to_plt/2 :: (file:filename(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (file:filename(), string()) -> 'ok').
-spec(halt_with_code/1 :: (atom()) -> no_return()).
-endif.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index e9baf2c480..42861f8603 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -39,7 +39,8 @@
-export([boot/0]).
--export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
+ handle_info/2]).
boot() ->
{ok, DefaultVHost} = application:get_env(default_vhost),
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 45b66712b8..875d680f86 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -33,7 +33,8 @@
-behaviour(gen_event).
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h
%% module because the original's init/1 does not match properly
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index d77bf833ea..d91ebe9ba9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -33,9 +33,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/5, lookup/1, lookup_or_die/1,
- list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
- publish/2]).
+-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0,
+ info/1, info/2, info_all/1, info_all/2, publish/2]).
-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -56,45 +55,72 @@
-ifdef(use_specs).
--type(bind_res() :: 'ok' | {'error',
- 'queue_not_found' |
- 'exchange_not_found' |
- 'exchange_and_queue_not_found'}).
--type(inner_fun() :: fun((exchange(), queue()) -> any())).
+-export_type([name/0, type/0, binding_key/0]).
+
+-type(name() :: rabbit_types:r('exchange')).
+-type(type() :: atom()).
+-type(binding_key() :: binary()).
+
+-type(bind_res() :: rabbit_types:ok_or_error('queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found')).
+-type(inner_fun() ::
+ fun((rabbit_types:exchange(), queue()) ->
+ rabbit_types:ok_or_error(rabbit_types:amqp_error()))).
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
- amqp_table()) -> exchange()).
+-spec(declare/5 ::
+ (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table())
+ -> rabbit_types:exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_equivalence/5 :: (exchange(), atom(), boolean(), boolean(),
- amqp_table()) -> 'ok').
--spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
--spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
--spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
--spec(list/1 :: (vhost()) -> [exchange()]).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (exchange()) -> [info()]).
--spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
--spec(info_all/1 :: (vhost()) -> [[info()]]).
--spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
--spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
+-spec(assert_equivalence/5 ::
+ (rabbit_types:exchange(), atom(), boolean(), boolean(),
+ rabbit_framing:amqp_table())
+ -> 'ok').
+-spec(assert_args_equivalence/2 ::
+ (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> 'ok').
+-spec(lookup/1 ::
+ (name()) -> rabbit_types:ok(rabbit_types:exchange()) |
+ rabbit_types:error('not_found')).
+-spec(lookup_or_die/1 :: (name()) -> rabbit_types:exchange()).
+-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:exchange()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (rabbit_types:exchange()) -> [rabbit_types:info()]).
+-spec(info/2 ::
+ (rabbit_types:exchange(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(info_all/1 :: (rabbit_types:vhost()) -> [[rabbit_types:info()]]).
+-spec(info_all/2 ::(rabbit_types:vhost(), [rabbit_types:info_key()])
+ -> [[rabbit_types:info()]]).
+-spec(publish/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
+ -> {rabbit_router:routing_result(), [pid()]}).
-spec(add_binding/5 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
- bind_res()).
+ (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table(), inner_fun())
+ -> bind_res()).
-spec(delete_binding/5 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
- bind_res() | {'error', 'binding_not_found'}).
--spec(list_bindings/1 :: (vhost()) ->
- [{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_queue_bindings/1 :: (queue_name()) -> fun (() -> none())).
--spec(delete_transient_queue_bindings/1 :: (queue_name()) ->
- fun (() -> none())).
--spec(delete/2 :: (exchange_name(), boolean()) ->
- 'ok' | not_found() | {'error', 'in_use'}).
--spec(list_queue_bindings/1 :: (queue_name()) ->
- [{exchange_name(), routing_key(), amqp_table()}]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
- [{queue_name(), routing_key(), amqp_table()}]).
+ (name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table(), inner_fun())
+ -> bind_res() | rabbit_types:error('binding_not_found')).
+-spec(list_bindings/1 ::
+ (rabbit_types:vhost())
+ -> [{name(), rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(delete_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> none())).
+-spec(delete_transient_queue_bindings/1 ::
+ (rabbit_amqqueue:name()) -> fun (() -> none())).
+-spec(delete/2 ::
+ (name(), boolean())-> 'ok' |
+ rabbit_types:error('not_found') |
+ rabbit_types:error('in_use')).
+-spec(list_queue_bindings/1 ::
+ (rabbit_amqqueue:name())
+ -> [{name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
+-spec(list_exchange_bindings/1 ::
+ (name()) -> [{rabbit_amqqueue:name(), rabbit_router:routing_key(),
+ rabbit_framing:amqp_table()}]).
-endif.
@@ -198,7 +224,7 @@ assert_equivalence(X = #exchange{ durable = Durable,
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
_Args) ->
rabbit_misc:protocol_error(
- precondition_failed,
+ not_allowed,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
@@ -215,7 +241,7 @@ assert_args_equivalence(#exchange{ name = Name,
Ae2 = alternate_exchange_value(Args),
if Ae1==Ae2 -> ok;
true -> rabbit_misc:protocol_error(
- precondition_failed,
+ not_allowed,
"cannot redeclare ~s with inequivalent args",
[rabbit_misc:rs(Name)])
end.
@@ -403,23 +429,27 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
- InnerFun(X, Q),
- case mnesia:read({rabbit_route, B}) of
- [] ->
- sync_binding(B,
- X#exchange.durable andalso
- Q#amqqueue.durable,
- fun mnesia:write/3),
- {new, X, B};
- [_R] ->
- {existing, X, B}
+ case InnerFun(X, Q) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ ok = sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
+ end;
+ {error, _} = E ->
+ E
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
(type_to_module(Type)):add_binding(Exchange, Binding);
{existing, _, _} ->
ok;
- Err = {error, _} ->
+ {error, _} = Err ->
Err
end.
@@ -429,14 +459,23 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
- [] -> {error, binding_not_found};
- _ -> InnerFun(X, Q),
- ok = sync_binding(B, Q#amqqueue.durable,
- fun mnesia:delete_object/3),
- {maybe_auto_delete(X), B}
+ [] ->
+ {error, binding_not_found};
+ _ ->
+ case InnerFun(X, Q) of
+ ok ->
+ ok =
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:delete_object/3),
+ {maybe_auto_delete(X), B};
+ {error, _} = E ->
+ E
+ end
end
end) of
- Err = {error, _} ->
+ {error, _} = Err ->
Err;
{{IsDeleted, X = #exchange{ type = Type }}, B} ->
Module = type_to_module(Type),
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 4f9712b14e..94798c78fe 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 315e800021..44607398cb 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,8 +36,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -48,7 +48,8 @@
{enables, kernel_ready}]}).
-ifdef(use_specs).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(headers_match/2 :: (rabbit_framing:amqp_table(),
+ rabbit_framing:amqp_table()) -> boolean()).
-endif.
description() ->
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 33ea0e926b..7906fbee72 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -35,8 +35,8 @@
-export([start_link/0]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-export([register/2, binary_to_type/1, lookup_module/1]).
@@ -45,10 +45,13 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | {'error', term()} | {'ok', pid()}).
+-spec(start_link/0 ::
+ () -> 'ignore' | rabbit_types:ok_or_error2(pid(), term())).
-spec(register/2 :: (binary(), atom()) -> 'ok').
--spec(binary_to_type/1 :: (binary()) -> atom() | {'error', 'not_found'}).
--spec(lookup_module/1 :: (atom()) -> {'ok', atom()} | {'error', 'not_found'}).
+-spec(binary_to_type/1 ::
+ (binary()) -> atom() | rabbit_types:error('not_found')).
+-spec(lookup_module/1 ::
+ (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')).
-endif.
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 0e22d5458e..a374cfee7f 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, publish/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/1, recover/2, delete/2, add_binding/2,
+ remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -49,7 +49,9 @@
-export([topic_matches/2]).
-ifdef(use_specs).
+
-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+
-endif.
description() ->
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 1ae8f7dac4..af1c629f41 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -31,15 +31,13 @@
-module(rabbit_guid).
--include("rabbit.hrl").
-
-behaviour(gen_server).
-export([start_link/0]).
-export([guid/0, string_guid/1, binstring_guid/1]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
-define(SERVER, ?MODULE).
-define(SERIAL_FILENAME, "rabbit_serial").
@@ -50,7 +48,11 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-export_type([guid/0]).
+
+-type(guid() :: binary()).
+
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(guid/0 :: () -> guid()).
-spec(string_guid/1 :: (any()) -> string()).
-spec(binstring_guid/1 :: (any()) -> binary()).
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index a7ca20c80b..8214b976c4 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -48,11 +48,11 @@
-ifdef(use_specs).
--type(ack() :: guid() | 'blank_ack').
+-type(ack() :: rabbit_guid:guid() | 'blank_ack').
-type(state() :: #iv_state { queue :: queue(),
- qname :: queue_name(),
+ qname :: rabbit_amqqueue:name(),
len :: non_neg_integer(),
- pending_ack :: dict()
+ pending_ack :: dict:dictionary()
}).
-include("rabbit_backing_queue_spec.hrl").
@@ -242,8 +242,7 @@ do_if_persistent(F, Txn, QName) ->
persist_message(QName, true, Txn, Msg = #basic_message {
is_persistent = true }) ->
Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties,
- %% rebuild from properties_bin on restore
+ %% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 4f467162e4..e0457b1e43 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -40,11 +40,10 @@
-ifdef(use_specs).
--type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
--spec(pick/0 :: () -> erlang_node()).
+-spec(pick/0 :: () -> node()).
-endif.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index cc80e360ae..85bcbca04a 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -50,7 +50,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(debug/1 :: (string()) -> 'ok').
-spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index e78b59f16e..bdf3807531 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -86,12 +86,12 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(deregister/1 :: (pid()) -> 'ok').
--spec(report_ram_duration/2 :: (pid(), float() | 'infinity') ->
- number() | 'infinity').
+-spec(report_ram_duration/2 ::
+ (pid(), float() | 'infinity') -> number() | 'infinity').
-spec(stop/0 :: () -> 'ok').
-endif.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 35739dcbdf..fcc9fc7e54 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -32,11 +32,12 @@
-module(rabbit_misc).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
+
-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
- protocol_error/3, protocol_error/4]).
+ protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -71,61 +72,84 @@
-ifdef(use_specs).
--include_lib("kernel/include/inet.hrl").
+-export_type([resource_name/0]).
--type(ok_or_error() :: 'ok' | {'error', any()}).
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+-type(thunk(T) :: fun(() -> T)).
+-type(resource_name() :: binary()).
--spec(method_record_type/1 :: (tuple()) -> atom()).
+-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
+ -> rabbit_framing:amqp_method_name()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
--spec(die/1 :: (atom()) -> no_return()).
--spec(frame_error/2 :: (atom(), binary()) -> no_return()).
--spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()).
--spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()).
--spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()).
--spec(not_found/1 :: (r(atom())) -> no_return()).
--spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
+-spec(die/1 :: (rabbit_framing:amqp_exception()) -> no_return()).
+-spec(frame_error/2 :: (rabbit_framing:amqp_method_name(), binary())
+ -> no_return()).
+-spec(amqp_error/4 ::
+ (rabbit_framing:amqp_exception(), string(), [any()],
+ rabbit_framing:amqp_method_name())
+ -> rabbit_types:amqp_error()).
+-spec(protocol_error/3 :: (rabbit_framing:amqp_exception(), string(), [any()])
+ -> no_return()).
+-spec(protocol_error/4 ::
+ (rabbit_framing:amqp_exception(), string(), [any()],
+ rabbit_framing:amqp_method_name())
+ -> no_return()).
+-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
+-spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
+-spec(get_config/1 ::
+ (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
--spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost() | r(atom()), K, resource_name()) ->
- r(K) when is_subtype(K, atom())).
--spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
- kind :: K,
- name :: '_'}
- when is_subtype(K, atom())).
--spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
- undefined | r(K) when is_subtype(K, atom())).
--spec(rs/1 :: (r(atom())) -> string()).
+-spec(dirty_read/1 ::
+ ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
+-spec(r/2 :: (rabbit_types:vhost(), K)
+ -> rabbit_types:r3(rabbit_types:vhost(), K, '_')
+ when is_subtype(K, atom())).
+-spec(r/3 ::
+ (rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name())
+ -> rabbit_types:r3(rabbit_types:vhost(), K, resource_name())
+ when is_subtype(K, atom())).
+-spec(r_arg/4 ::
+ (rabbit_types:vhost() | rabbit_types:r(atom()), K,
+ rabbit_framing:amqp_table(), binary())
+ -> undefined | rabbit_types:r(K)
+ when is_subtype(K, atom())).
+-spec(rs/1 :: (rabbit_types:r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
--spec(report_cover/1 :: (file_path()) -> 'ok').
+-spec(enable_cover/1 :: (file:filename()) -> ok_or_error()).
+-spec(report_cover/1 :: (file:filename()) -> 'ok').
-spec(throw_on_error/2 ::
- (atom(), thunk({error, any()} | {ok, A} | A)) -> A).
+ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
--spec(with_user/2 :: (username(), thunk(A)) -> A).
--spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
--spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
+-spec(with_user/2 :: (rabbit_access_control:username(), thunk(A)) -> A).
+-spec(with_vhost/2 :: (rabbit_types:vhost(), thunk(A)) -> A).
+-spec(with_user_and_vhost/3 ::
+ (rabbit_access_control:username(), rabbit_types:vhost(), thunk(A))
+ -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()).
--spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}).
+-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
+-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
-spec(cookie_hash/0 :: () -> string()).
--spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
+-spec(tcp_name/3 ::
+ (atom(), inet:ip_address(), rabbit_networking:ip_port())
+ -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_fold/3 :: (fun ((any(), A) -> A), A, atom()) -> A).
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
--spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
- 'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (file_path()) -> ok_or_error()).
--spec(read_term_file/1 :: (file_path()) -> {'ok', [any()]} | {'error', any()}).
--spec(write_term_file/2 :: (file_path(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (file_path(), string()) -> ok_or_error()).
+-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
+ -> 'ok' | 'aborted').
+-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(read_term_file/1 ::
+ (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
@@ -133,15 +157,18 @@
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> integer()).
-spec(queue_fold/3 :: (fun ((any(), B) -> B), B, queue()) -> B).
--spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
+-spec(sort_field_table/1 ::
+ (rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
--spec(version_compare/3 :: (string(), string(),
- ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean()).
--spec(recursive_delete/1 :: ([file_path()]) ->
- 'ok' | {'error', {file_path(), any()}}).
--spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
+-spec(version_compare/3 ::
+ (string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
+ -> boolean()).
+-spec(recursive_delete/1 ::
+ ([file:filename()])
+ -> rabbit_types:ok_or_error({file:filename(), any()})).
+-spec(dict_cons/3 :: (any(), any(), dict:dictionary()) -> dict:dictionary()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-endif.
@@ -173,7 +200,10 @@ protocol_error(Name, ExplanationFormat, Params) ->
protocol_error(Name, ExplanationFormat, Params, none).
protocol_error(Name, ExplanationFormat, Params, Method) ->
- exit(amqp_error(Name, ExplanationFormat, Params, Method)).
+ protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)).
+
+protocol_error(#amqp_error{} = Error) ->
+ exit(Error).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d4b299435d..5c14ba7b16 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -29,6 +29,7 @@
%% Contributor(s): ______________________________________.
%%
+
-module(rabbit_mnesia).
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
@@ -47,13 +48,16 @@
-ifdef(use_specs).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [erlang_node()]}]} |
- {'running_nodes', [erlang_node()]}]).
--spec(dir/0 :: () -> file_path()).
+-export_type([node_type/0]).
+
+-type(node_type() :: disc_only | disc | ram | unknown).
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(dir/0 :: () -> file:filename()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([erlang_node()]) -> 'ok').
+-spec(cluster/1 :: ([node()]) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
-spec(is_clustered/0 :: () -> boolean()).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 975954fcd2..6baa4b8864 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -31,31 +31,42 @@
-module(rabbit_net).
-include("rabbit.hrl").
--include_lib("kernel/include/inet.hrl").
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, port_command/2,
send/2, sockname/1]).
+
%%---------------------------------------------------------------------------
-ifdef(use_specs).
+-export_type([socket/0]).
+
-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
--type(error() :: {'error', any()}).
-
--spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
--spec(close/1 :: (socket()) -> 'ok' | error()).
--spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
+-type(error() :: rabbit_types:error(any())).
+-type(socket() :: rabbit_networking:ip_port() | rabbit_types:ssl_socket()).
+
+-spec(async_recv/3 ::
+ (socket(), integer(), timeout()) -> rabbit_types:ok(any())).
+-spec(close/1 :: (socket()) -> rabbit_types:ok_or_error(any())).
+-spec(controlling_process/2 ::
+ (socket(), pid()) -> rabbit_types:ok_or_error(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
--spec(peername/1 :: (socket()) ->
- {'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(sockname/1 :: (socket()) ->
- {'ok', {ip_address(), non_neg_integer()}} | error()).
--spec(getstat/2 :: (socket(), [stat_option()]) ->
- {'ok', [{stat_option(), integer()}]} | error()).
+-spec(send/2 ::
+ (socket(), binary() | iolist()) -> rabbit_types:ok_or_error(any())).
+-spec(peername/1 ::
+ (socket())
+ -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
+ error()).
+-spec(sockname/1 ::
+ (socket())
+ -> rabbit_types:ok({inet:ip_address(), rabbit_networking:ip_port()}) |
+ error()).
+-spec(getstat/2 ::
+ (socket(), [stat_option()])
+ -> rabbit_types:ok([{stat_option(), integer()}]) | error()).
-endif.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 68ffc98ad7..3a3357ba9d 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -63,25 +63,29 @@
-ifdef(use_specs).
--type(host() :: ip_address() | string() | atom()).
--type(connection() :: pid()).
+-export_type([ip_port/0, hostname/0]).
-spec(start/0 :: () -> 'ok').
--spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
--spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
--spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
--spec(active_listeners/0 :: () -> [listener()]).
--spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
--spec(connections/0 :: () -> [connection()]).
--spec(connection_info_keys/0 :: () -> [info_key()]).
--spec(connection_info/1 :: (connection()) -> [info()]).
--spec(connection_info/2 :: (connection(), [info_key()]) -> [info()]).
--spec(connection_info_all/0 :: () -> [[info()]]).
--spec(connection_info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(start_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(start_ssl_listener/3 :: (hostname(), ip_port(), [rabbit_types:info()])
+ -> 'ok').
+-spec(stop_tcp_listener/2 :: (hostname(), ip_port()) -> 'ok').
+-spec(active_listeners/0 :: () -> [rabbit_types:listener()]).
+-spec(node_listeners/1 :: (node()) -> [rabbit_types:listener()]).
+-spec(connections/0 :: () -> [rabbit_types:connection()]).
+-spec(connection_info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(connection_info/1 ::
+ (rabbit_types:connection()) -> [rabbit_types:info()]).
+-spec(connection_info/2 ::
+ (rabbit_types:connection(), [rabbit_types:info_key()])
+ -> [rabbit_types:info()]).
+-spec(connection_info_all/0 :: () -> [[rabbit_types:info()]]).
+-spec(connection_info_all/1 ::
+ ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-spec(close_connection/2 :: (pid(), string()) -> 'ok').
--spec(on_node_down/1 :: (erlang_node()) -> 'ok').
--spec(check_tcp_listener_address/3 :: (atom(), host(), ip_port()) ->
- {ip_address(), atom()}).
+-spec(on_node_down/1 :: (node()) -> 'ok').
+-spec(check_tcp_listener_address/3 ::
+ (atom(), hostname(), ip_port()) -> {inet:ip_address(), atom()}).
-endif.
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 8d3c2dc082..a427b13548 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -65,21 +65,29 @@
-ifdef(use_specs).
--type(pmsg() :: {queue_name(), pkey()}).
+-type(pkey() :: rabbit_guid:guid()).
+-type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
+
-type(work_item() ::
- {publish, message(), pmsg()} |
+ {publish, rabbit_types:message(), pmsg()} |
{deliver, pmsg()} |
{ack, pmsg()}).
--spec(start_link/1 :: ([queue_name()]) ->
- {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ ([rabbit_amqqueue:name()])
+ -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
+-spec(extend_transaction/2 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()])
+ -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
--spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(commit_transaction/1 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
+-spec(rollback_transaction/1 ::
+ ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(queue_content/1 :: (queue_name()) -> [{message(), boolean()}]).
+-spec(queue_content/1 ::
+ (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]).
-endif.
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_queue_collector.erl
index 8d4e8fdb42..ea3768d4b4 100644
--- a/src/rabbit_reader_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -29,16 +29,16 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_reader_queue_collector).
+-module(rabbit_queue_collector).
-behaviour(gen_server).
--export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1, shutdown/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {exclusive_queues}).
+-record(state, {queues}).
-include("rabbit.hrl").
@@ -46,8 +46,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()}).
--spec(register_exclusive_queue/2 :: (pid(), amqqueue()) -> 'ok').
+-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
+-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -57,8 +57,8 @@
start_link() ->
gen_server:start_link(?MODULE, [], []).
-register_exclusive_queue(CollectorPid, Q) ->
- gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+register(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
@@ -69,25 +69,24 @@ shutdown(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{exclusive_queues = dict:new()}}.
+ {ok, #state{queues = dict:new()}}.
%%--------------------------------------------------------------------------
-handle_call({register_exclusive_queue, Q}, _From,
- State = #state{exclusive_queues = Queues}) ->
+handle_call({register, Q}, _From,
+ State = #state{queues = Queues}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
{reply, ok,
- State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+ State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-handle_call(delete_all, _From,
- State = #state{exclusive_queues = ExclusiveQueues}) ->
+handle_call(delete_all, _From, State = #state{queues = Queues}) ->
[rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
rabbit_amqqueue:delete(Q, false, false)
end)
- || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ || {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State};
handle_call(shutdown, _From, State) ->
@@ -97,9 +96,8 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{exclusive_queues = ExclusiveQueues}) ->
- {noreply, State#state{exclusive_queues =
- dict:erase(MonitorRef, ExclusiveQueues)}}.
+ State = #state{queues = Queues}) ->
+ {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bac67b1d55..0b5d5458aa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -138,11 +138,11 @@
-ifdef(use_specs).
--spec(info_keys/0 :: () -> [info_key()]).
--spec(info/1 :: (pid()) -> [info()]).
--spec(info/2 :: (pid(), [info_key()]) -> [info()]).
+-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
+-spec(info/1 :: (pid()) -> [rabbit_types:info()]).
+-spec(info/2 :: (pid(), [rabbit_types:info_key()]) -> [rabbit_types:info()]).
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(server_properties/0 :: () -> amqp_table()).
+-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
-endif.
@@ -240,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_reader_queue_collector:start_link(),
+ {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -273,7 +273,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -447,7 +447,7 @@ maybe_close(State = #v1{connection_state = closing,
%% connection, and are deleted when that connection closes."
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
- rabbit_reader_queue_collector:delete_all(Collector),
+ rabbit_queue_collector:delete_all(Collector),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
close_connection(State);
_ -> State
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 75196bc0d4..d50b9f3126 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -41,7 +41,13 @@
-ifdef(use_specs).
--spec(deliver/2 :: ([pid()], delivery()) -> {routing_result(), [pid()]}).
+-export_type([routing_key/0, routing_result/0]).
+
+-type(routing_key() :: binary()).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+
+-spec(deliver/2 ::
+ ([pid()], rabbit_types:delivery()) -> {routing_result(), [pid()]}).
-endif.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 434cdae050..eb2037c21b 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -33,7 +33,8 @@
-behaviour(gen_event).
--export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2,
+ code_change/3]).
%% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h
%% module because the original's init/1 does not match properly
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
new file mode 100644
index 0000000000..402ab1e894
--- /dev/null
+++ b/src/rabbit_types.erl
@@ -0,0 +1,147 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_types).
+
+-include("rabbit.hrl").
+
+-ifdef(use_specs).
+
+-export_type([txn/0, maybe/1, info/0, info_key/0, message/0, basic_message/0,
+ delivery/0, content/0, decoded_content/0, undecoded_content/0,
+ unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
+ amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
+ binding/0, amqqueue/0, exchange/0, connection/0, user/0,
+ error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+
+-type(maybe(T) :: T | 'none').
+-type(vhost() :: binary()).
+-type(ctag() :: binary()).
+
+%% TODO: make this more precise by tying specific class_ids to
+%% specific properties
+-type(undecoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: 'none',
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]} |
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: 'none',
+ payload_fragments_rev :: [binary()]}).
+-type(unencoded_content() :: undecoded_content()).
+-type(decoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: rabbit_framing:amqp_property_record(),
+ properties_bin :: maybe(binary()),
+ payload_fragments_rev :: [binary()]}).
+-type(encoded_content() ::
+ #content{class_id :: rabbit_framing:amqp_class_id(),
+ properties :: maybe(rabbit_framing:amqp_property_record()),
+ properties_bin :: binary(),
+ payload_fragments_rev :: [binary()]}).
+-type(content() :: undecoded_content() | decoded_content()).
+-type(basic_message() ::
+ #basic_message{exchange_name :: rabbit_exchange:name(),
+ routing_key :: rabbit_router:routing_key(),
+ content :: content(),
+ guid :: rabbit_guid:guid(),
+ is_persistent :: boolean()}).
+-type(message() :: basic_message()).
+-type(delivery() ::
+ #delivery{mandatory :: boolean(),
+ immediate :: boolean(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
+
+%% this is really an abstract type, but dialyzer does not support them
+-type(txn() :: rabbit_guid:guid()).
+
+-type(info_key() :: atom()).
+-type(info() :: {info_key(), any()}).
+
+-type(amqp_error() ::
+ #amqp_error{name :: rabbit_framing:amqp_exception(),
+ explanation :: string(),
+ method :: rabbit_framing:amqp_method_name()}).
+
+-type(r(Kind) ::
+ r2(vhost(), Kind)).
+-type(r2(VirtualHost, Kind) ::
+ r3(VirtualHost, Kind, rabbit_misc:resource_name())).
+-type(r3(VirtualHost, Kind, Name) ::
+ #resource{virtual_host :: VirtualHost,
+ kind :: Kind,
+ name :: Name}).
+
+-type(ssl_socket() :: #ssl_socket{}).
+
+-type(listener() ::
+ #listener{node :: node(),
+ protocol :: atom(),
+ host :: rabbit_networking:hostname(),
+ port :: rabbit_networking:ip_port()}).
+
+-type(binding() ::
+ #binding{exchange_name :: rabbit_exchange:name(),
+ queue_name :: rabbit_amqqueue:name(),
+ key :: rabbit_exchange:binding_key()}).
+
+-type(amqqueue() ::
+ #amqqueue{name :: rabbit_amqqueue:name(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ exclusive_owner :: rabbit_types:maybe(pid()),
+ arguments :: rabbit_framing:amqp_table(),
+ pid :: rabbit_types:maybe(pid())}).
+
+-type(exchange() ::
+ #exchange{name :: rabbit_exchange:name(),
+ type :: rabbit_exchange:type(),
+ durable :: boolean(),
+ auto_delete :: boolean(),
+ arguments :: rabbit_framing:amqp_table()}).
+
+-type(connection() :: pid()).
+
+-type(protocol() :: atom()).
+
+-type(user() ::
+ #user{username :: rabbit_access_control:username(),
+ password :: rabbit_access_control:password()}).
+
+-type(ok(A) :: {'ok', A}).
+-type(error(A) :: {'error', A}).
+-type(ok_or_error(A) :: 'ok' | error(A)).
+-type(ok_or_error2(A, B) :: ok(A) | error(B)).
+
+-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 6bdc174228..4bdaddeaa9 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -49,21 +49,36 @@
-ifdef(use_specs).
-spec(start/4 ::
- (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()).
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> pid()).
-spec(start_link/4 ::
- (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
--spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> pid()).
+-spec(send_command/2 ::
+ (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
+-spec(send_command/3 ::
+ (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
+ -> 'ok').
+-spec(send_command_and_signal_back/3 ::
+ (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
-spec(send_command_and_signal_back/4 ::
- (pid(), amqp_method(), content(), pid()) -> 'ok').
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:content())
+ -> 'ok').
-spec(internal_send_command/4 ::
- (socket(), channel_number(), amqp_method_record(), protocol()) -> 'ok').
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
+ -> 'ok').
-spec(internal_send_command/6 ::
- (socket(), channel_number(), amqp_method_record(),
- content(), non_neg_integer(), protocol()) -> 'ok').
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ rabbit_framing:amqp_method_record(), rabbit_types:content(),
+ non_neg_integer(), rabbit_types:protocol())
+ -> 'ok').
-endif.
@@ -158,6 +173,7 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
shutdown(W) ->
W ! shutdown,
+ rabbit_misc:unlink_and_capture_exit(W),
ok.
%---------------------------------------------------------------------------
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 0b1d726562..03dc0f990f 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -11,20 +11,20 @@
%% All modifications are (C) 2010 LShift Ltd.
%%
%% %CopyrightBegin%
-%%
+%%
%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
-%%
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
-module(supervisor2).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index cd03fcc6e6..bbc3a8c017 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -72,8 +72,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (float()) ->
- ('ignore' | {'error', any()} | {'ok', pid()})).
+-spec(start_link/1 ::
+ (float()) -> 'ignore' |
+ rabbit_types:error(any()) |
+ rabbit_types:ok(pid())).
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> (non_neg_integer() | 'unknown')).
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 97e075459f..01ce3535d8 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -52,7 +52,7 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/1 ::
(fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 4ded63a8db..afa21164be 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -41,9 +41,9 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(start_link/1 ::
- (non_neg_integer()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/0 :: () -> 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
+-spec(start_link/1 :: (non_neg_integer()) ->
+ 'ignore' | rabbit_types:ok_or_error2(pid(), any())).
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 57901fd5cf..a61e4cc372 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -44,7 +44,8 @@
-ifdef(use_specs).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(start_link/1 ::
+ (any()) -> {'ok', pid()} | 'ignore' | rabbit_types:error(any())).
-spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
-spec(submit_async/2 ::
(pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').