summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_channel.erl22
-rw-r--r--src/rabbit_channel_sup.erl17
-rw-r--r--src/rabbit_direct.erl21
-rw-r--r--src/rabbit_reader.erl58
-rw-r--r--src/rabbit_tests.erl21
6 files changed, 86 insertions, 59 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0d8a4c92e9..e794b4aa1e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -658,13 +658,13 @@ message_properties(#q{ttl=TTL}) ->
#message_properties{expiry = calculate_msg_expiry(TTL)}.
calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000).
+calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
- Now = now_millis(),
+ Now = now_micros(),
BQS1 = BQ:dropwhile(
fun (#message_properties{expiry = Expiry}) ->
Now > Expiry
@@ -685,7 +685,7 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
-now_millis() -> timer:now_diff(now(), {0,0,0}).
+now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 783889b60c..cb681661bf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/8, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -35,7 +35,7 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm,
- confirmed}).
+ confirmed, capabilities}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -67,9 +67,9 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
+-spec(start_link/8 ::
(channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(),
fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -95,10 +95,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
+ CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(?MODULE,
+ [Channel, ReaderPid, WriterPid, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -149,7 +150,7 @@ emit_stats(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
+init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid,
StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -175,8 +176,9 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
confirmed = [],
- unconfirmed_qm = gb_trees:empty()},
+ capabilities = Capabilities},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index d21cfdb7fb..9005819405 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,9 +33,10 @@
-type(start_link_args() ::
{'tcp', rabbit_types:protocol(), rabbit_net:socket(),
rabbit_channel:channel_number(), non_neg_integer(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()} |
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid()} |
{'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid()}).
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}).
-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
@@ -44,7 +45,7 @@
%%----------------------------------------------------------------------------
start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
- Collector}) ->
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
@@ -56,19 +57,21 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, User, VHost,
+ [Channel, ReaderPid, WriterPid, User, VHost, Capabilities,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, User, VHost, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities,
+ Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- User, VHost, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, User,
+ VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 3b8c9fba39..5c89bf49c9 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/3, start_channel/5]).
+-export([boot/0, connect/4, start_channel/6]).
-include("rabbit.hrl").
@@ -25,12 +25,13 @@
-ifdef(use_specs).
-spec(boot/0 :: () -> 'ok').
--spec(connect/3 :: (binary(), binary(), binary()) ->
+-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/5 :: (rabbit_channel:channel_number(), pid(),
- rabbit_types:user(), rabbit_types:vhost(), pid()) ->
- {'ok', pid()}).
+-spec(start_channel/6 ::
+ (rabbit_channel:channel_number(), pid(), rabbit_types:user(),
+ rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) ->
+ {'ok', pid()}).
-endif.
@@ -49,13 +50,14 @@ boot() ->
%%----------------------------------------------------------------------------
-connect(Username, Password, VHost) ->
+connect(Username, Password, VHost, Protocol) ->
case lists:keymember(rabbit, 1, application:which_applications()) of
true ->
try rabbit_access_control:user_pass_login(Username, Password) of
#user{} = User ->
try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> {ok, {User, rabbit_reader:server_properties()}}
+ ok -> {ok, {User,
+ rabbit_reader:server_properties(Protocol)}}
catch
exit:#amqp_error{name = access_refused} ->
{error, access_refused}
@@ -67,9 +69,10 @@ connect(Username, Password, VHost) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, User, VHost, Collector) ->
+start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, User, VHost, Collector}]),
+ [{direct, Number, ClientChannelPid, User, VHost, Capabilities,
+ Collector}]),
{ok, ChannelPid}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1781469a13..e9ff97f952 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -24,7 +24,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/0]).
+-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -160,7 +160,8 @@
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
--spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+-spec(server_properties/1 :: (rabbit_types:protocol()) ->
+ rabbit_framing:amqp_table()).
%% These specs only exists to add no_return() to keep dialyzer happy
-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun())
@@ -219,7 +220,7 @@ conserve_memory(Pid, Conserve) ->
Pid ! {conserve_memory, Conserve},
ok.
-server_properties() ->
+server_properties(Protocol) ->
{ok, Product} = application:get_key(rabbit, id),
{ok, Version} = application:get_key(rabbit, vsn),
@@ -230,22 +231,30 @@ server_properties() ->
%% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms
%% from the config and merge them with the generated built-in properties
NormalizedConfigServerProps =
- [case X of
- {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
- longstr,
- list_to_binary(Value)};
- {BinKey, Type, Value} -> {BinKey, Type, Value}
- end || X <- RawConfigServerProps ++
- [{product, Product},
- {version, Version},
- {platform, "Erlang/OTP"},
- {copyright, ?COPYRIGHT_MESSAGE},
- {information, ?INFORMATION_MESSAGE}]],
+ [{<<"capabilities">>, table, server_capabilities(Protocol)} |
+ [case X of
+ {KeyAtom, Value} -> {list_to_binary(atom_to_list(KeyAtom)),
+ longstr,
+ list_to_binary(Value)};
+ {BinKey, Type, Value} -> {BinKey, Type, Value}
+ end || X <- RawConfigServerProps ++
+ [{product, Product},
+ {version, Version},
+ {platform, "Erlang/OTP"},
+ {copyright, ?COPYRIGHT_MESSAGE},
+ {information, ?INFORMATION_MESSAGE}]]],
%% Filter duplicated properties in favor of config file provided values
lists:usort(fun ({K1,_,_}, {K2,_,_}) -> K1 =< K2 end,
NormalizedConfigServerProps).
+server_capabilities(rabbit_framing_amqp_0_9_1) ->
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true}];
+server_capabilities(_) ->
+ [].
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
@@ -655,7 +664,7 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
- server_properties = server_properties(),
+ server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
@@ -709,12 +718,18 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism),
+ Capabilities =
+ case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
+ {table, Capabilities1} -> Capabilities1;
+ _ -> []
+ end,
State = State0#v1{auth_mechanism = AuthMechanism,
auth_state = AuthMechanism:init(Sock),
connection_state = securing,
connection =
Connection#connection{
- client_properties = ClientProperties}},
+ client_properties = ClientProperties,
+ capabilities = Capabilities}},
auth_phase(Response, State);
handle_method0(#'connection.secure_ok'{response = Response},
@@ -947,14 +962,15 @@ cert_info(F, Sock) ->
send_to_new_channel(Channel, AnalyzedFrame, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost}} = State,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User,
- VHost, Collector}),
+ VHost, Capabilities, Collector}),
erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 58c369b5a3..59862821f7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1020,7 +1020,7 @@ test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
{ok, Ch} = rabbit_channel:start_link(1, self(), Writer,
- user(<<"user">>), <<"/">>, self(),
+ user(<<"user">>), <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
@@ -1079,8 +1079,8 @@ test_server_status() ->
test_spawn(Receiver) ->
Me = self(),
Writer = spawn(fun () -> Receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, Me, Writer,
- user(<<"guest">>), <<"/">>, self(),
+ {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>),
+ <<"/">>, [], self(),
fun (_) -> {ok, self()} end),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
@@ -2201,9 +2201,11 @@ test_configurable_server_properties() ->
BuiltInPropNames = [<<"product">>, <<"version">>, <<"platform">>,
<<"copyright">>, <<"information">>],
+ Protocol = rabbit_framing_amqp_0_9_1,
+
%% Verify that the built-in properties are initially present
- ActualPropNames = [Key ||
- {Key, longstr, _} <- rabbit_reader:server_properties()],
+ ActualPropNames = [Key || {Key, longstr, _} <-
+ rabbit_reader:server_properties(Protocol)],
true = lists:all(fun (X) -> lists:member(X, ActualPropNames) end,
BuiltInPropNames),
@@ -2214,9 +2216,10 @@ test_configurable_server_properties() ->
ConsProp = fun (X) -> application:set_env(rabbit,
server_properties,
[X | ServerProperties]) end,
- IsPropPresent = fun (X) -> lists:member(X,
- rabbit_reader:server_properties())
- end,
+ IsPropPresent =
+ fun (X) ->
+ lists:member(X, rabbit_reader:server_properties(Protocol))
+ end,
%% Add a wholly new property of the simplified {KeyAtom, StringValue} form
NewSimplifiedProperty = {NewHareKey, NewHareVal} = {hare, "soup"},
@@ -2239,7 +2242,7 @@ test_configurable_server_properties() ->
{BinNewVerKey, BinNewVerVal} = {list_to_binary(atom_to_list(NewVerKey)),
list_to_binary(NewVerVal)},
ConsProp(NewVersion),
- ClobberedServerProps = rabbit_reader:server_properties(),
+ ClobberedServerProps = rabbit_reader:server_properties(Protocol),
%% Is the clobbering insert present?
true = IsPropPresent({BinNewVerKey, longstr, BinNewVerVal}),
%% Is the clobbering insert the only thing with the clobbering key?