summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-16 15:04:19 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-16 15:04:19 +0000
commit982603860aedd6fef441aae23d18ba24029734fc (patch)
tree613c06829f8f96eee2e0a67e7cd2a05c0f546037 /src
parent031508ec16a3a86844433330143bfd88f1f6b855 (diff)
parentb0961fe47618356c79c63f73fe6e33392feca102 (diff)
downloadrabbitmq-server-git-982603860aedd6fef441aae23d18ba24029734fc.tar.gz
Merging default into bug23554
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_access_control.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl50
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_external.erl12
-rw-r--r--src/rabbit_auth_mechanism_plain.erl11
-rw-r--r--src/rabbit_reader.erl29
6 files changed, 73 insertions, 40 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index f2d2b016f3..2bc946db5f 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -59,7 +59,7 @@
-> rabbit_types:user() | rabbit_types:channel_exit()).
-spec(check_user_pass_login/2 ::
(username(), password())
- -> {'ok', rabbit_types:user()} | {'refused', username()}).
+ -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
-spec(make_salt/0 :: () -> binary()).
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
@@ -102,16 +102,15 @@
user_pass_login(User, Pass) ->
?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]),
case check_user_pass_login(User, Pass) of
- {refused, _} ->
+ {refused, Msg, Args} ->
rabbit_misc:protocol_error(
- access_refused, "login refused for user '~s'", [User]);
+ access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]);
{ok, U} ->
U
end.
check_user_pass_login(Username, Pass) ->
- Refused = {refused, io_lib:format("user '~s' - invalid credentials",
- [Username])},
+ Refused = {refused, "user '~s' - invalid credentials", [Username]},
case lookup_user(Username) of
{ok, User} ->
case check_password(Pass, User#user.password_hash) of
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6278d5a105..f8ec4ec8b3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -224,6 +224,8 @@ terminate_shutdown(Fun, State) ->
BQ:tx_rollback(Txn, BQSN),
BQSN1
end, BQS, all_ch_record()),
+ [emit_consumer_deleted(Ch, CTag)
+ || {CTag, Ch, _} <- consumers(State1)],
rabbit_event:notify(queue_deleted, [{pid, self()}]),
State1#q{backing_queue_state = Fun(BQS1)}
end.
@@ -570,12 +572,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue).
+ {Kept, Removed} = split_by_channel(ChPid, Queue),
+ [emit_consumer_deleted(Ch, CTag) ||
+ {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
+ Kept.
move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = split_by_channel(ChPid, From),
+ {Kept, queue:join(To, Removed)}.
+
+split_by_channel(ChPid, Queue) ->
{Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(From)),
- {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
+ queue:to_list(Queue)),
+ {queue:from_list(Kept), queue:from_list(Removed)}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -755,12 +764,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+consumers(#q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+
emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, ConsumerTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, self()}]).
+
+emit_consumer_deleted(ChPid, ConsumerTag) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, self()}]).
+
%---------------------------------------------------------------------------
prioritise_call(Msg, _From, _State) ->
@@ -823,14 +854,8 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(consumers, _From,
- State = #q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- reply(rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+handle_call(consumers, _From, State) ->
+ reply(consumers(State), State);
handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
_From, State) ->
@@ -933,6 +958,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck),
reply(ok, State2)
end;
@@ -951,6 +978,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
C1#cr{limiter_pid = undefined};
_ -> C1
end),
+ emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 1258cb8d13..ce1b16acd6 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -49,7 +49,7 @@ behaviour_info(callbacks) ->
%% Another round is needed. Here's the state I want next time.
%% {protocol_error, Msg, Args}
%% Client got the protocol wrong. Log and die.
- %% {refused, Username}
+ %% {refused, Msg, Args}
%% Client failed authentication. Log and die.
{handle_response, 2}
];
diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl
index b21dd31366..6572f78612 100644
--- a/src/rabbit_auth_mechanism_external.erl
+++ b/src/rabbit_auth_mechanism_external.erl
@@ -62,23 +62,23 @@ init(Sock) ->
{ok, C} ->
CN = case rabbit_ssl:peer_cert_subject_item(
C, ?'id-at-commonName') of
- not_found -> {refused, "no CN found"};
+ not_found -> {refused, "no CN found", []};
CN0 -> list_to_binary(CN0)
end,
case config_sane() of
true -> CN;
- false -> {refused, "configuration unsafe"}
+ false -> {refused, "configuration unsafe", []}
end;
{error, no_peercert} ->
- {refused, "no peer certificate"};
+ {refused, "no peer certificate", []};
nossl ->
- {refused, "not SSL connection"}
+ {refused, "not SSL connection", []}
end,
#state{username = Username}.
handle_response(_Response, #state{username = Username}) ->
case Username of
- {refused, _} = E ->
+ {refused, _, _} = E ->
E;
_ ->
case rabbit_access_control:lookup_user(Username) of
@@ -87,7 +87,7 @@ handle_response(_Response, #state{username = Username}) ->
{error, not_found} ->
%% This is not an information leak as we have to
%% have validated a client cert to get this far.
- {refused, io_lib:format("user '~s' not found", [Username])}
+ {refused, "user '~s' not found", [Username]}
end
end.
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 8758f85f6c..e5f8f3e6a1 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -56,6 +56,11 @@ init(_Sock) ->
[].
handle_response(Response, _State) ->
- [User, Pass] = [list_to_binary(T) ||
- T <- string:tokens(binary_to_list(Response), [0])],
- rabbit_access_control:check_user_pass_login(User, Pass).
+ %% The '%%"' at the end of the next line is for Emacs
+ case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%"
+ [{capture, all_but_first, binary}]) of
+ {match, [User, Pass]} ->
+ rabbit_access_control:check_user_pass_login(User, Pass);
+ _ ->
+ {protocol_error, "response ~p invalid", [Response]}
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 15b20bc40c..92a2f4d7fe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -714,24 +714,24 @@ ensure_stats_timer(State) ->
handle_method0(MethodName, FieldsBin,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- try
- handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
- State)
- catch exit:Reason ->
- CompleteReason = case Reason of
- #amqp_error{method = none} ->
- Reason#amqp_error{method = MethodName};
- OtherReason -> OtherReason
- end,
+ HandleException =
+ fun(R) ->
case ?IS_RUNNING(State) of
- true -> send_exception(State, 0, CompleteReason);
+ true -> send_exception(State, 0, R);
%% We don't trust the client at this point - force
%% them to wait for a bit so they can't DOS us with
%% repeated failed logins etc.
false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000),
- throw({channel0_error, State#v1.connection_state,
- CompleteReason})
+ throw({channel0_error, State#v1.connection_state, R})
end
+ end,
+ try
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
+ State)
+ catch exit:#amqp_error{method = none} = Reason ->
+ HandleException(Reason#amqp_error{method = MethodName});
+ Type:Reason ->
+ HandleException({Type, Reason, MethodName, erlang:get_stacktrace()})
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
@@ -869,10 +869,11 @@ auth_phase(Response,
#connection{protocol = Protocol},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
- {refused, Reason} ->
+ {refused, Msg, Args} ->
rabbit_misc:protocol_error(
access_refused, "~s login refused: ~s",
- [proplists:get_value(name, AuthMechanism:description()), Reason]);
+ [proplists:get_value(name, AuthMechanism:description()),
+ io_lib:format(Msg, Args)]);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->