diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 271 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 |
8 files changed, 171 insertions, 168 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 1ddb515173..196212eaee 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -136,7 +136,7 @@ start(normal, []) -> ok = rabbit_amqqueue:start(), - {ok, MemoryAlarms} = application:get_env(memory_alarms), + {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), ok = rabbit_binary_generator: @@ -304,7 +304,7 @@ rotate_logs(File, Suffix, OldHandler, NewHandler) -> log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, - {cannot_rotate_sasl_logs, SaslLogError}}}; + {cannot_rotate_sasl_logs, SaslLogError}}}; log_rotation_result({error, MainLogError}, ok) -> {error, {cannot_rotate_main_logs, MainLogError}}; log_rotation_result(ok, {error, SaslLogError}) -> diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 08bc5fc370..eda747b287 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -245,8 +245,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6027c9c04c..cf0ef44f5c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,14 +53,15 @@ has_had_consumers, next_msg_id, message_buffer, - round_robin}). + active_consumers, + blocked_consumers}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary --record(cr, {consumers, +-record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, @@ -99,7 +100,8 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}, ?HIBERNATE_AFTER}. + active_consumers = queue:new(), + blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -129,7 +131,7 @@ ch_record(ChPid) -> case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumers = [], + C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), @@ -148,7 +150,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> BlockedOld = is_ch_blocked(OldCR), @@ -165,18 +167,18 @@ record_current_channel_tx(ChPid, Txn) -> deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - case queue:out(RoundRobin) of + case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, - RoundRobinTail} -> + ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -188,18 +190,32 @@ deliver_immediately(Message, Delivered, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), - NewConsumers = + {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) + ok -> {queue:in(QEntry, ActiveConsumersTail), + BlockedConsumers}; + block -> + {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId + 1}}; + {offered, AckRequired, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), - NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_immediately(Message, Delivered, - State#q{round_robin = NewConsumers}) + {NewActiveConsumers, NewBlockedConsumers} = + move_consumers(ChPid, + ActiveConsumers, + BlockedConsumers), + deliver_immediately( + Message, Delivered, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end; {empty, _} -> {not_offered, State} @@ -235,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), State). -block_consumers(ChPid, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(RoundRobin))). - -unblock_consumers(ChPid, Consumers, RoundRobin) -> - %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]), - queue:join(RoundRobin, - queue:from_list([{ChPid, Con} || Con <- Consumers])). +add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). -block_consumer(ChPid, ConsumerTag, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]), +remove_consumer(ChPid, ConsumerTag, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 queue:from_list(lists:filter( fun ({CP, #consumer{tag = CT}}) -> (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(RoundRobin))). + end, queue:to_list(Queue))). + +remove_consumers(ChPid, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 + queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(Queue))). + +move_consumers(ChPid, From, To) -> + {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(From)), + {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -261,50 +279,25 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> NewRR = unblock_consumers(ChPid, - NewC#cr.consumers, - State#q.round_robin), - run_poke_burst(State#q{round_robin = NewRR}) + unblock -> {NewBlockedeConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedeConsumers}) end end. -check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> - {continue, State}; -check_auto_delete(State = #q{has_had_consumers = false}) -> - {continue, State}; -check_auto_delete(State = #q{round_robin = RoundRobin}) -> - % The clauses above rule out cases where no-one has consumed from - % this queue yet, and cases where we are not an auto_delete queue - % in any case. Thus it remains to check whether we have any active - % listeners at this point. - case queue:is_empty(RoundRobin) of - true -> - % There are no waiting listeners. It's possible that we're - % completely unused. Check. - case is_unused() of - true -> - % There are no active consumers at this - % point. This is the signal to autodelete. - {stop, State}; - false -> - % There is at least one active consumer, so we - % shouldn't delete ourselves. - {continue, State} - end; - false -> - % There are some waiting listeners, thus we are not - % unused, so can continue life as normal without needing - % to check the process dictionary. - {continue, State} - end. +should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; +should_auto_delete(#q{has_had_consumers = false}) -> false; +should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, - round_robin = ActiveConsumers}) -> +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> - NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), case Txn of @@ -312,20 +305,22 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, _ -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn) end, - case check_auto_delete( - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - round_robin = NewActive})) of - {continue, NewState} -> - noreply(NewState); - {stop, NewState} -> - {stop, normal, NewState} + NewState = + deliver_or_enqueue_n( + [{Message, true} || + {_Messsage_id, Message} <- dict:to_list(UAM)], + State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), + blocked_consumers = remove_consumers( + ChPid, State#q.blocked_consumers)}), + case should_auto_delete(NewState) of + false -> noreply(NewState); + true -> {stop, normal, NewState} end end. @@ -338,12 +333,12 @@ check_queue_owner(none, _) -> ok; check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; check_queue_owner({_, _}, _) -> mismatch. -check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) -> +check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; -check_exclusive_access(none, false) -> +check_exclusive_access(none, false, _State) -> ok; -check_exclusive_access(none, true) -> - case is_unused() of +check_exclusive_access(none, true, State) -> + case is_unused(State) of true -> ok; false -> in_use end. @@ -368,16 +363,8 @@ run_poke_burst(MessageBuffer, State) -> State#q{message_buffer = MessageBuffer} end. -is_unused() -> - is_unused1(get()). - -is_unused1([]) -> - true; -is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest]) - when Consumers /= [] -> - false; -is_unused1([_ | Rest]) -> - is_unused1(Rest). +is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso + queue:is_empty(State#q.blocked_consumers). maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -536,9 +523,8 @@ i(messages, State) -> i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); -i(consumers, _) -> - lists:sum([length(Consumers) || - #cr{consumers = Consumers} <- all_ch_record()]); +i(consumers, State) -> + queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); i(transactions, _) -> length(all_tx_record()); i(memory, _) -> @@ -620,78 +606,91 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> + exclusive_consumer = ExistingHolder}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume) of + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumers = Consumers} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - store_ch_record(C#cr{consumers = [Consumer | Consumers], + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, + store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if Consumers == [] -> + if ConsumerCount == 0 -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, - exclusive_consumer = - if - ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, + exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - reply(ok, run_poke_burst(State1)) + State2 = + case is_ch_blocked(C) of + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; + false -> run_poke_burst( + State1#q{ + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) + end, + reply(ok, State2) end end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder, - round_robin = RoundRobin}) -> + State = #q{exclusive_consumer = Holder}) -> case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> - NewConsumers = lists:filter - (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, - Consumers), - store_ch_record(C#cr{consumers = NewConsumers}), - if NewConsumers == [] -> + C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> + store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), + if ConsumerCount == 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> ok end, ok = maybe_send_reply(ChPid, OkMsg), - case check_auto_delete( - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - round_robin = block_consumer(ChPid, - ConsumerTag, - RoundRobin)}) of - {continue, State1} -> - reply(ok, State1); - {stop, State1} -> - {stop, normal, ok, State1} + NewState = + State#q{exclusive_consumer = cancel_holder(ChPid, + ConsumerTag, + Holder), + active_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.active_consumers), + blocked_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.blocked_consumers)}, + case should_auto_delete(NewState) of + false -> reply(ok, NewState); + true -> {stop, normal, ok, NewState} end end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, - round_robin = RoundRobin}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); + active_consumers = ActiveConsumers}) -> + reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, + State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> IsEmpty = queue:is_empty(MessageBuffer), - IsUnused = is_unused(), + IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); @@ -710,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> - case check_exclusive_access(Holder, true) of + case check_exclusive_access(Holder, true, State) of in_use -> %% FIXME: Is this really the right answer? What if %% an active consumer's reader is actually the @@ -786,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumers = Consumers, + fun (C = #cr{consumer_count = ConsumerCount, limiter_pid = OldLimiterPid, is_limit_active = Limited}) -> - if Consumers =/= [] andalso OldLimiterPid == undefined -> + if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e9161e8ad4..f17ee2f530 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -232,9 +232,9 @@ route(X = #exchange{type = topic}, RoutingKey, _Content) -> route(X = #exchange{type = headers}, _RoutingKey, Content) -> Headers = case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - H -> sort_arguments(H) - end, + undefined -> []; + H -> sort_arguments(H) + end, match_bindings(X, fun (#binding{args = Spec}) -> headers_match(Spec, Headers) end); @@ -467,14 +467,14 @@ parse_x_match(Other) -> %% headers_match(Pattern, Data) -> MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of - {value, {_, longstr, MK}} -> parse_x_match(MK); - {value, {_, Type, MK}} -> - rabbit_log:warning("Invalid x-match field type ~p " + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " "(value ~p); expected longstr", - [Type, MK]), - default_headers_match_kind(); - _ -> default_headers_match_kind() - end, + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, headers_match(Pattern, Data, true, false, MatchKind). headers_match([], _Data, AllMatch, _AnyMatch, all) -> @@ -501,8 +501,8 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], %% the corresponding data field. I've interpreted that to %% mean a type of "void" for the pattern field. PT == void -> {AllMatch, true}; - %% Similarly, it's not specified, but I assume that a - %% mismatched type causes a mismatched value. + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. PT =/= DT -> {false, AnyMatch}; PV == DV -> {AllMatch, true}; true -> {false, AnyMatch} diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f9b6ebb9b..9f3dcbd071 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> +can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid) -> +can_send(LimiterPid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> +handle_call({can_send, QPid, AckRequired}, _From, + State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end. handle_cast(shutdown, State) -> diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f408336e94..dd5b498b07 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -75,7 +75,7 @@ debug(Fmt, Args) when is_list(Args) -> message(Direction, Channel, MethodRecord, Content) -> gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). + {message, Direction, Channel, MethodRecord, Content}). info(Fmt) -> gen_server:cast(?SERVER, {info, Fmt}). @@ -112,11 +112,11 @@ handle_cast({debug, Fmt, Args}, State) -> {noreply, State}; handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), + [case Direction of + in -> "-->"; + out -> "<--" end, + Channel, + {MethodRecord, Content}]), {noreply, State}; handle_cast({info, Fmt}, State) -> error_logger:info_msg(Fmt), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f0d9033d07..6a7d68084d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -231,7 +231,7 @@ start_connection(Parent, Deb, ClientSock) -> connection_state = pre_init}, handshake, 8)) catch - Ex -> (if Ex == connection_closed_abruptly -> + Ex -> (if Ex == connection_closed_abruptly -> fun rabbit_log:warning/2; true -> fun rabbit_log:error/2 diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8f0a3a8973..01757509ec 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -261,7 +261,7 @@ test_log_management() -> %% original log files are not writable ok = make_files_non_writable([MainLog, SaslLog]), {error, {{cannot_rotate_main_logs, _}, - {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), %% logging directed to tty (handlers were removed in last test) ok = clean_logs([MainLog, SaslLog], Suffix), @@ -280,7 +280,7 @@ test_log_management() -> ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), ok = application:set_env(kernel, error_logger, {file, MainLog}), ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, - {rabbit_sasl_report_file_h, SaslLog}]), + {rabbit_sasl_report_file_h, SaslLog}]), passed. test_log_management_during_startup() -> |
