diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-16 15:04:19 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-16 15:04:19 +0000 |
| commit | 982603860aedd6fef441aae23d18ba24029734fc (patch) | |
| tree | 613c06829f8f96eee2e0a67e7cd2a05c0f546037 /src | |
| parent | 031508ec16a3a86844433330143bfd88f1f6b855 (diff) | |
| parent | b0961fe47618356c79c63f73fe6e33392feca102 (diff) | |
| download | rabbitmq-server-git-982603860aedd6fef441aae23d18ba24029734fc.tar.gz | |
Merging default into bug23554
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_external.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_plain.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 29 |
6 files changed, 73 insertions, 40 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index f2d2b016f3..2bc946db5f 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -59,7 +59,7 @@ -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: (username(), password()) - -> {'ok', rabbit_types:user()} | {'refused', username()}). + -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). -spec(make_salt/0 :: () -> binary()). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) @@ -102,16 +102,15 @@ user_pass_login(User, Pass) -> ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), case check_user_pass_login(User, Pass) of - {refused, _} -> + {refused, Msg, Args} -> rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]); + access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]); {ok, U} -> U end. check_user_pass_login(Username, Pass) -> - Refused = {refused, io_lib:format("user '~s' - invalid credentials", - [Username])}, + Refused = {refused, "user '~s' - invalid credentials", [Username]}, case lookup_user(Username) of {ok, User} -> case check_password(Pass, User#user.password_hash) of diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6278d5a105..f8ec4ec8b3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -224,6 +224,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(Ch, CTag) + || {CTag, Ch, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -570,12 +572,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(Ch, CTag) || + {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -755,12 +764,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -823,14 +854,8 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, _From, State) -> @@ -933,6 +958,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -951,6 +978,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 1258cb8d13..ce1b16acd6 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -49,7 +49,7 @@ behaviour_info(callbacks) -> %% Another round is needed. Here's the state I want next time. %% {protocol_error, Msg, Args} %% Client got the protocol wrong. Log and die. - %% {refused, Username} + %% {refused, Msg, Args} %% Client failed authentication. Log and die. {handle_response, 2} ]; diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl index b21dd31366..6572f78612 100644 --- a/src/rabbit_auth_mechanism_external.erl +++ b/src/rabbit_auth_mechanism_external.erl @@ -62,23 +62,23 @@ init(Sock) -> {ok, C} -> CN = case rabbit_ssl:peer_cert_subject_item( C, ?'id-at-commonName') of - not_found -> {refused, "no CN found"}; + not_found -> {refused, "no CN found", []}; CN0 -> list_to_binary(CN0) end, case config_sane() of true -> CN; - false -> {refused, "configuration unsafe"} + false -> {refused, "configuration unsafe", []} end; {error, no_peercert} -> - {refused, "no peer certificate"}; + {refused, "no peer certificate", []}; nossl -> - {refused, "not SSL connection"} + {refused, "not SSL connection", []} end, #state{username = Username}. handle_response(_Response, #state{username = Username}) -> case Username of - {refused, _} = E -> + {refused, _, _} = E -> E; _ -> case rabbit_access_control:lookup_user(Username) of @@ -87,7 +87,7 @@ handle_response(_Response, #state{username = Username}) -> {error, not_found} -> %% This is not an information leak as we have to %% have validated a client cert to get this far. - {refused, io_lib:format("user '~s' not found", [Username])} + {refused, "user '~s' not found", [Username]} end end. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 8758f85f6c..e5f8f3e6a1 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -56,6 +56,11 @@ init(_Sock) -> []. handle_response(Response, _State) -> - [User, Pass] = [list_to_binary(T) || - T <- string:tokens(binary_to_list(Response), [0])], - rabbit_access_control:check_user_pass_login(User, Pass). + %% The '%%"' at the end of the next line is for Emacs + case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" + [{capture, all_but_first, binary}]) of + {match, [User, Pass]} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, "response ~p invalid", [Response]} + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 15b20bc40c..92a2f4d7fe 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -714,24 +714,24 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), - State) - catch exit:Reason -> - CompleteReason = case Reason of - #amqp_error{method = none} -> - Reason#amqp_error{method = MethodName}; - OtherReason -> OtherReason - end, + HandleException = + fun(R) -> case ?IS_RUNNING(State) of - true -> send_exception(State, 0, CompleteReason); + true -> send_exception(State, 0, R); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, - CompleteReason}) + throw({channel0_error, State#v1.connection_state, R}) end + end, + try + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + State) + catch exit:#amqp_error{method = none} = Reason -> + HandleException(Reason#amqp_error{method = MethodName}); + Type:Reason -> + HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, @@ -869,10 +869,11 @@ auth_phase(Response, #connection{protocol = Protocol}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of - {refused, Reason} -> + {refused, Msg, Args} -> rabbit_misc:protocol_error( access_refused, "~s login refused: ~s", - [proplists:get_value(name, AuthMechanism:description()), Reason]); + [proplists:get_value(name, AuthMechanism:description()), + io_lib:format(Msg, Args)]); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> |
