summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-05 17:55:50 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-05 17:55:50 +0000
commit274663ef88a8fe219f59c9746915541b5eecbd05 (patch)
treee54efc5c267c38c71a78215c135e7f4c92d94c12 /src
parentb8de90325428e9278a3b3c64e32b87d88b30b6be (diff)
parent6fbb6dc875e75ed8c797b1a028cbcfb4c60695f9 (diff)
downloadrabbitmq-server-git-274663ef88a8fe219f59c9746915541b5eecbd05.tar.gz
merge bug25266 into bug25273
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_binary_generator.erl57
-rw-r--r--src/rabbit_control_main.erl2
-rw-r--r--src/rabbit_net.erl42
-rw-r--r--src/rabbit_networking.erl2
-rw-r--r--src/rabbit_reader.erl45
-rw-r--r--src/rabbit_tests.erl146
6 files changed, 99 insertions, 195 deletions
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 2ece86963f..a333c1ce05 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -21,7 +21,7 @@
-export([build_simple_method_frame/3,
build_simple_content_frames/4,
build_heartbeat_frame/0]).
--export([generate_table/1, encode_properties/2]).
+-export([generate_table/1]).
-export([check_empty_frame_size/0]).
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
@@ -42,8 +42,6 @@
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
--spec(encode_properties/2 ::
- ([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_frame_size/0 :: () -> 'ok').
-spec(ensure_content_encoded/2 ::
(rabbit_types:content(), rabbit_types:protocol()) ->
@@ -168,59 +166,6 @@ long_string_to_binary(String) when is_binary(String) ->
long_string_to_binary(String) ->
[<<(length(String)):32>>, String].
-encode_properties([], []) ->
- <<0, 0>>;
-encode_properties(TypeList, ValueList) ->
- encode_properties(0, TypeList, ValueList, 0, [], []).
-
-encode_properties(_Bit, [], [],
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- list_to_binary([lists:reverse(FlagsAcc),
- <<FirstShortAcc:16>>,
- lists:reverse(PropsAcc)]);
-encode_properties(_Bit, [], _ValueList,
- _FirstShortAcc, _FlagsAcc, _PropsAcc) ->
- exit(content_properties_values_overflow);
-encode_properties(15, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- NewFlagsShort = FirstShortAcc bor 1, % set the continuation low bit
- encode_properties(0, TypeList, ValueList,
- 0, [<<NewFlagsShort:16>> | FlagsAcc], PropsAcc);
-encode_properties(Bit, [bit | TypeList], [true | ValueList],
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc, PropsAcc);
-encode_properties(Bit, [bit | TypeList], [false | ValueList],
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
-encode_properties(_Bit, [bit | _TypeList], [Other | _ValueList],
- _FirstShortAcc, _FlagsAcc, _PropsAcc) ->
- exit({content_properties_illegal_bit_value, Other});
-encode_properties(Bit, [_Type | TypeList], [undefined | ValueList],
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc, FlagsAcc, PropsAcc);
-encode_properties(Bit, [Type | TypeList], [Value | ValueList],
- FirstShortAcc, FlagsAcc, PropsAcc) ->
- encode_properties(Bit + 1, TypeList, ValueList,
- FirstShortAcc bor (1 bsl (15 - Bit)), FlagsAcc,
- [encode_property(Type, Value) | PropsAcc]).
-
-encode_property(shortstr, String) ->
- Len = size(String),
- if Len < 256 -> <<Len:8, String:Len/binary>>;
- true -> exit(content_properties_shortstr_overflow)
- end;
-encode_property(longstr, String) -> Len = size(String),
- <<Len:32, String:Len/binary>>;
-encode_property(octet, Int) -> <<Int:8/unsigned>>;
-encode_property(short, Int) -> <<Int:16/unsigned>>;
-encode_property(long, Int) -> <<Int:32/unsigned>>;
-encode_property(longlong, Int) -> <<Int:64/unsigned>>;
-encode_property(timestamp, Int) -> <<Int:64/unsigned>>;
-encode_property(table, Table) -> table_to_binary(Table).
-
check_empty_frame_size() ->
%% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
case iolist_size(create_frame(?FRAME_BODY, 0, <<>>)) of
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 1b42a84753..669a078754 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -386,7 +386,7 @@ action(list_bindings, Node, Args, Opts, Inform) ->
action(list_connections, Node, Args, _Opts, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
+ ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 038154c36e..562fc1971e 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
close/1, fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2]).
+ tune_buffer_size/1, connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -36,7 +36,7 @@
-type(socket() :: port() | #ssl_socket{}).
-type(opts() :: [{atom(), any()} |
{raw, non_neg_integer(), non_neg_integer(), binary()}]).
-
+-type(host_or_ip() :: binary() | inet:ip_address()).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
-> 'nossl' | ok_val_or_error(
@@ -72,6 +72,10 @@
-spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
+-spec(socket_ends/2 ::
+ (socket(), 'inbound' | 'outbound')
+ -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
+ host_or_ip(), rabbit_networking:ip_port()})).
-endif.
@@ -193,17 +197,37 @@ tune_buffer_size(Sock) ->
end.
connection_string(Sock, Direction) ->
- {From, To} = case Direction of
- inbound -> {fun peername/1, fun sockname/1};
- outbound -> {fun sockname/1, fun peername/1}
- end,
+ case socket_ends(Sock, Direction) of
+ {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+ {ok, rabbit_misc:format(
+ "~s:~p -> ~s:~p",
+ [maybe_ntoab(FromAddress), FromPort,
+ maybe_ntoab(ToAddress), ToPort])};
+ Error ->
+ Error
+ end.
+
+socket_ends(Sock, Direction) ->
+ {From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
- {ok, rabbit_misc:format("~s:~p -> ~s:~p",
- [rabbit_misc:ntoab(FromAddress), FromPort,
- rabbit_misc:ntoab(ToAddress), ToPort])};
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
{{error, _Reason} = Error, _} ->
Error;
{_, {error, _Reason} = Error} ->
Error
end.
+
+maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
+maybe_ntoab(Host) -> Host.
+
+rdns(Addr) ->
+ {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
+ case Lookup of
+ true -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
+ end.
+
+sock_funs(inbound) -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 5cf8d1aef6..31eeef730c 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -21,7 +21,7 @@
node_listeners/1, connections/0, connection_info_keys/0,
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
- close_connection/2, force_connection_event_refresh/0]).
+ close_connection/2, force_connection_event_refresh/0, tcp_host/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/6,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 829e9e52fe..928786e983 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -39,19 +39,19 @@
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
auth_mechanism, auth_state, conserve_resources,
- last_blocked_by, last_blocked_at}).
+ last_blocked_by, last_blocked_at, host, peer_host,
+ port, peer_port}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
channels]).
--define(CREATION_EVENT_KEYS, [pid, name, address, port, peer_address, peer_port,
- ssl, peer_cert_subject, peer_cert_issuer,
- peer_cert_validity, auth_mechanism,
- ssl_protocol, ssl_key_exchange,
- ssl_cipher, ssl_hash,
- protocol, user, vhost, timeout, frame_max,
- client_properties]).
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -192,17 +192,20 @@ socket_op(Sock, Fun) ->
name(Sock) ->
socket_op(Sock, fun (S) -> rabbit_net:connection_string(S, inbound) end).
+socket_ends(Sock) ->
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end).
+
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
Name = name(Sock),
log(info, "accepting AMQP connection ~p (~s)~n", [self(), Name]),
ClientSock = socket_op(Sock, SockTransform),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
+ {PeerHost, PeerPort, Host, Port} = socket_ends(Sock),
State = #v1{parent = Parent,
sock = ClientSock,
- name = Name,
+ name = list_to_binary(Name),
connection = #connection{
protocol = none,
user = none,
@@ -225,7 +228,11 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
auth_state = none,
conserve_resources = false,
last_blocked_by = none,
- last_blocked_at = never},
+ last_blocked_at = never,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort},
try
ok = inet_op(fun () -> rabbit_net:tune_buffer_size(ClientSock) end),
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -894,15 +901,11 @@ auth_phase(Response,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) -> self();
-i(name, #v1{name = Name}) -> list_to_binary(Name);
-i(address, S) -> socket_info(fun rabbit_net:sockname/1,
- fun ({A, _}) -> A end, S);
-i(port, S) -> socket_info(fun rabbit_net:sockname/1,
- fun ({_, P}) -> P end, S);
-i(peer_address, S) -> socket_info(fun rabbit_net:peername/1,
- fun ({A, _}) -> A end, S);
-i(peer_port, S) -> socket_info(fun rabbit_net:peername/1,
- fun ({_, P}) -> P end, S);
+i(name, #v1{name = Name}) -> Name;
+i(host, #v1{host = Host}) -> Host;
+i(peer_host, #v1{peer_host = PeerHost}) -> PeerHost;
+i(port, #v1{port = Port}) -> Port;
+i(peer_port, #v1{peer_port = PeerPort}) -> PeerPort;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 715aa186ec..f802a5a0d9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,7 +18,7 @@
-compile([export_all]).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0]).
-import(rabbit_misc, [pget/2]).
@@ -46,7 +46,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_supervisor_delayed_restart(),
- passed = test_parsing(),
+ passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
passed = test_topic_matching(),
@@ -424,113 +424,45 @@ test_unfold() ->
end, 10),
passed.
-test_parsing() ->
- passed = test_content_properties(),
- passed = test_field_values(),
- passed.
-
-test_content_prop_encoding(Datum, Binary) ->
- Types = [element(1, E) || E <- Datum],
- Values = [element(2, E) || E <- Datum],
- Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
-
-test_content_properties() ->
- test_content_prop_encoding([], <<0, 0>>),
- test_content_prop_encoding([{bit, true}, {bit, false}, {bit, true}, {bit, false}],
- <<16#A0, 0>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {bit, true}, {octet, undefined},
- {bit, true}],
- <<16#E8,0,123>>),
- test_content_prop_encoding([{bit, true}, {octet, 123}, {octet, 123}, {bit, true}],
- <<16#F0,0,123,123>>),
- test_content_prop_encoding([{bit, true}, {shortstr, <<"hi">>}, {bit, true},
- {short, 54321}, {bit, true}],
- <<16#F8,0,2,"hi",16#D4,16#31>>),
- test_content_prop_encoding([{bit, true}, {shortstr, undefined}, {bit, true},
- {short, 54321}, {bit, true}],
- <<16#B8,0,16#D4,16#31>>),
- test_content_prop_encoding([{table, [{<<"a signedint">>, signedint, 12345678},
- {<<"a longstr">>, longstr, <<"yes please">>},
- {<<"a decimal">>, decimal, {123, 12345678}},
- {<<"a timestamp">>, timestamp, 123456789012345},
- {<<"a nested table">>, table,
- [{<<"one">>, signedint, 1},
- {<<"two">>, signedint, 2}]}]}],
- <<
- %% property-flags
- 16#8000:16,
-
- %% property-list:
-
- %% table
- 117:32, % table length in bytes
-
- 11,"a signedint", % name
- "I",12345678:32, % type and value
-
- 9,"a longstr",
- "S",10:32,"yes please",
-
- 9,"a decimal",
- "D",123,12345678:32,
-
- 11,"a timestamp",
- "T", 123456789012345:64,
-
- 14,"a nested table",
- "F",
- 18:32,
-
- 3,"one",
- "I",1:32,
-
- 3,"two",
- "I",2:32 >>),
- passed.
-
-test_field_values() ->
+test_table_codec() ->
%% FIXME this does not test inexact numbers (double and float) yet,
%% because they won't pass the equality assertions
- test_content_prop_encoding(
- [{table, [{<<"longstr">>, longstr, <<"Here is a long string">>},
- {<<"signedint">>, signedint, 12345},
- {<<"decimal">>, decimal, {3, 123456}},
- {<<"timestamp">>, timestamp, 109876543209876},
- {<<"table">>, table, [{<<"one">>, signedint, 54321},
- {<<"two">>, longstr, <<"A long string">>}]},
- {<<"byte">>, byte, 255},
- {<<"long">>, long, 1234567890},
- {<<"short">>, short, 655},
- {<<"bool">>, bool, true},
- {<<"binary">>, binary, <<"a binary string">>},
- {<<"void">>, void, undefined},
- {<<"array">>, array, [{signedint, 54321},
- {longstr, <<"A long string">>}]}
-
- ]}],
- <<
- %% property-flags
- 16#8000:16,
- %% table length in bytes
- 228:32,
-
- 7,"longstr", "S", 21:32, "Here is a long string", % = 34
- 9,"signedint", "I", 12345:32/signed, % + 15 = 49
- 7,"decimal", "D", 3, 123456:32, % + 14 = 63
- 9,"timestamp", "T", 109876543209876:64, % + 19 = 82
- 5,"table", "F", 31:32, % length of table % + 11 = 93
- 3,"one", "I", 54321:32, % + 9 = 102
- 3,"two", "S", 13:32, "A long string", % + 22 = 124
- 4,"byte", "b", 255:8, % + 7 = 131
- 4,"long", "l", 1234567890:64, % + 14 = 145
- 5,"short", "s", 655:16, % + 9 = 154
- 4,"bool", "t", 1, % + 7 = 161
- 6,"binary", "x", 15:32, "a binary string", % + 27 = 188
- 4,"void", "V", % + 6 = 194
- 5,"array", "A", 23:32, % + 11 = 205
- "I", 54321:32, % + 5 = 210
- "S", 13:32, "A long string" % + 18 = 228
- >>),
+ Table = [{<<"longstr">>, longstr, <<"Here is a long string">>},
+ {<<"signedint">>, signedint, 12345},
+ {<<"decimal">>, decimal, {3, 123456}},
+ {<<"timestamp">>, timestamp, 109876543209876},
+ {<<"table">>, table, [{<<"one">>, signedint, 54321},
+ {<<"two">>, longstr,
+ <<"A long string">>}]},
+ {<<"byte">>, byte, 255},
+ {<<"long">>, long, 1234567890},
+ {<<"short">>, short, 655},
+ {<<"bool">>, bool, true},
+ {<<"binary">>, binary, <<"a binary string">>},
+ {<<"void">>, void, undefined},
+ {<<"array">>, array, [{signedint, 54321},
+ {longstr, <<"A long string">>}]}
+ ],
+ Binary = <<
+ 7,"longstr", "S", 21:32, "Here is a long string",
+ 9,"signedint", "I", 12345:32/signed,
+ 7,"decimal", "D", 3, 123456:32,
+ 9,"timestamp", "T", 109876543209876:64,
+ 5,"table", "F", 31:32, % length of table
+ 3,"one", "I", 54321:32,
+ 3,"two", "S", 13:32, "A long string",
+ 4,"byte", "b", 255:8,
+ 4,"long", "l", 1234567890:64,
+ 5,"short", "s", 655:16,
+ 4,"bool", "t", 1,
+ 6,"binary", "x", 15:32, "a binary string",
+ 4,"void", "V",
+ 5,"array", "A", 23:32,
+ "I", 54321:32,
+ "S", 13:32, "A long string"
+ >>,
+ Binary = rabbit_binary_generator:generate_table(Table),
+ Table = rabbit_binary_parser:parse_table(Binary),
passed.
%% Test that content frames don't exceed frame-max